summaryrefslogtreecommitdiff
path: root/innobase/trx/trx0trx.c
diff options
context:
space:
mode:
authorunknown <monty@donna.mysql.com>2001-02-17 14:19:19 +0200
committerunknown <monty@donna.mysql.com>2001-02-17 14:19:19 +0200
commit2662b59306ef0cd495fa6e2edf7129e58a11393a (patch)
treebfe39951a73e906579ab819bf5198ad8f3a64a36 /innobase/trx/trx0trx.c
parent66de55a56bdcf2f7a9c0c4f8e19b3e761475e202 (diff)
downloadmariadb-git-2662b59306ef0cd495fa6e2edf7129e58a11393a.tar.gz
Added Innobase to source distribution
Docs/manual.texi: Added Innobase documentation configure.in: Incremented version include/my_base.h: Added option for Innobase myisam/mi_check.c: cleanup mysql-test/t/bdb.test: cleanup mysql-test/t/innobase.test: Extended with new tests from bdb.test mysql-test/t/merge.test: Added test of SHOW create mysys/my_init.c: Fix for UNIXWARE 7 scripts/mysql_install_db.sh: Always write how to start mysqld scripts/safe_mysqld.sh: Fixed type sql/ha_innobase.cc: Update to new version sql/ha_innobase.h: Update to new version sql/handler.h: Added 'update_table_comment()' and 'append_create_info()' sql/sql_delete.cc: Fixes for Innobase sql/sql_select.cc: Fixes for Innobase sql/sql_show.cc: Append create information (for MERGE tables) sql/sql_update.cc: Fixes for Innobase
Diffstat (limited to 'innobase/trx/trx0trx.c')
-rw-r--r--innobase/trx/trx0trx.c1270
1 files changed, 1270 insertions, 0 deletions
diff --git a/innobase/trx/trx0trx.c b/innobase/trx/trx0trx.c
new file mode 100644
index 00000000000..0c1c3aff8d6
--- /dev/null
+++ b/innobase/trx/trx0trx.c
@@ -0,0 +1,1270 @@
+/******************************************************
+The transaction
+
+(c) 1996 Innobase Oy
+
+Created 3/26/1996 Heikki Tuuri
+*******************************************************/
+
+#include "trx0trx.h"
+
+#ifdef UNIV_NONINL
+#include "trx0trx.ic"
+#endif
+
+#include "trx0undo.h"
+#include "trx0rseg.h"
+#include "log0log.h"
+#include "que0que.h"
+#include "lock0lock.h"
+#include "trx0roll.h"
+#include "usr0sess.h"
+#include "read0read.h"
+#include "srv0srv.h"
+#include "thr0loc.h"
+
+/* Dummy session used currently in MySQL interface */
+sess_t* trx_dummy_sess = NULL;
+
+/* Number of transactions currently allocated for MySQL: protected by
+the kernel mutex */
+ulint trx_n_mysql_transactions = 0;
+
+/********************************************************************
+Takes care of the error handling when an SQL error or other error has
+occurred. */
+static
+void
+trx_error_handle(
+/*=============*/
+ trx_t* trx); /* in: trx handle */
+
+/********************************************************************
+Creates and initializes a transaction object. */
+
+trx_t*
+trx_create(
+/*=======*/
+ /* out, own: the transaction */
+ sess_t* sess) /* in: session or NULL */
+{
+ trx_t* trx;
+
+ ut_ad(mutex_own(&kernel_mutex));
+
+ trx = mem_alloc(sizeof(trx_t));
+
+ trx->type = TRX_USER;
+ trx->conc_state = TRX_NOT_STARTED;
+
+ trx->dict_operation = FALSE;
+
+ trx->n_mysql_tables_in_use = 0;
+
+ mutex_create(&(trx->undo_mutex));
+ mutex_set_level(&(trx->undo_mutex), SYNC_TRX_UNDO);
+
+ trx->rseg = NULL;
+
+ trx->undo_no = ut_dulint_zero;
+ trx->last_sql_stat_start.least_undo_no = ut_dulint_zero;
+ trx->insert_undo = NULL;
+ trx->update_undo = NULL;
+ trx->undo_no_arr = NULL;
+
+ trx->error_state = DB_SUCCESS;
+
+ trx->sess = sess;
+ trx->que_state = TRX_QUE_RUNNING;
+ trx->n_active_thrs = 0;
+
+ trx->handling_signals = FALSE;
+
+ UT_LIST_INIT(trx->signals);
+ UT_LIST_INIT(trx->reply_signals);
+
+ trx->graph = NULL;
+
+ trx->wait_lock = NULL;
+ UT_LIST_INIT(trx->wait_thrs);
+
+ trx->lock_heap = mem_heap_create_in_buffer(256);
+ UT_LIST_INIT(trx->trx_locks);
+
+ trx->read_view_heap = mem_heap_create(256);
+ trx->read_view = NULL;
+
+ return(trx);
+}
+
+/************************************************************************
+Creates a transaction object for MySQL. */
+
+trx_t*
+trx_allocate_for_mysql(void)
+/*========================*/
+ /* out, own: transaction object */
+{
+ trx_t* trx;
+
+ mutex_enter(&kernel_mutex);
+
+ /* Open a dummy session */
+
+ if (!trx_dummy_sess) {
+ trx_dummy_sess = sess_open(NULL, (byte*)"Dummy sess",
+ ut_strlen("Dummy sess"));
+ }
+
+ trx = trx_create(trx_dummy_sess);
+
+ trx_n_mysql_transactions++;
+
+ mutex_exit(&kernel_mutex);
+
+ trx->mysql_thread_id = os_thread_get_curr_id();
+
+ return(trx);
+}
+
+/************************************************************************
+Frees a transaction object. */
+
+void
+trx_free(
+/*=====*/
+ trx_t* trx) /* in, own: trx object */
+{
+ ut_ad(mutex_own(&kernel_mutex));
+ ut_a(trx->conc_state == TRX_NOT_STARTED);
+
+ mutex_free(&(trx->undo_mutex));
+
+ ut_a(trx->insert_undo == NULL);
+ ut_a(trx->update_undo == NULL);
+
+ ut_a(trx->n_mysql_tables_in_use == 0);
+
+ if (trx->undo_no_arr) {
+ trx_undo_arr_free(trx->undo_no_arr);
+ }
+
+ ut_a(UT_LIST_GET_LEN(trx->signals) == 0);
+ ut_a(UT_LIST_GET_LEN(trx->reply_signals) == 0);
+
+ ut_a(trx->wait_lock == NULL);
+ ut_a(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
+
+ if (trx->lock_heap) {
+ mem_heap_free(trx->lock_heap);
+ }
+
+ ut_a(UT_LIST_GET_LEN(trx->trx_locks) == 0);
+
+ if (trx->read_view_heap) {
+ mem_heap_free(trx->read_view_heap);
+ }
+
+ ut_a(trx->read_view == NULL);
+
+ mem_free(trx);
+}
+
+/************************************************************************
+Frees a transaction object for MySQL. */
+
+void
+trx_free_for_mysql(
+/*===============*/
+ trx_t* trx) /* in, own: trx object */
+{
+ thr_local_free(trx->mysql_thread_id);
+
+ mutex_enter(&kernel_mutex);
+
+ trx_free(trx);
+
+ ut_a(trx_n_mysql_transactions > 0);
+
+ trx_n_mysql_transactions--;
+
+ mutex_exit(&kernel_mutex);
+}
+
+/********************************************************************
+Inserts the trx handle in the trx system trx list in the right position.
+The list is sorted on the trx id so that the biggest id is at the list
+start. This function is used at the database startup to insert incomplete
+transactions to the list. */
+static
+void
+trx_list_insert_ordered(
+/*====================*/
+ trx_t* trx) /* in: trx handle */
+{
+ trx_t* trx2;
+
+ ut_ad(mutex_own(&kernel_mutex));
+
+ trx2 = UT_LIST_GET_FIRST(trx_sys->trx_list);
+
+ while (trx2 != NULL) {
+ if (ut_dulint_cmp(trx->id, trx2->id) >= 0) {
+
+ ut_ad(ut_dulint_cmp(trx->id, trx2->id) == 1);
+ break;
+ }
+ trx2 = UT_LIST_GET_NEXT(trx_list, trx2);
+ }
+
+ if (trx2 != NULL) {
+ trx2 = UT_LIST_GET_PREV(trx_list, trx2);
+
+ if (trx2 == NULL) {
+ UT_LIST_ADD_FIRST(trx_list, trx_sys->trx_list, trx);
+ } else {
+ UT_LIST_INSERT_AFTER(trx_list, trx_sys->trx_list,
+ trx2, trx);
+ }
+ } else {
+ UT_LIST_ADD_LAST(trx_list, trx_sys->trx_list, trx);
+ }
+}
+
+/********************************************************************
+Creates trx objects for transactions and initializes the trx list of
+trx_sys at database start. Rollback segment and undo log lists must
+already exist when this function is called, because the lists of
+transactions to be rolled back or cleaned up are built based on the
+undo log lists. */
+
+void
+trx_lists_init_at_db_start(void)
+/*============================*/
+{
+ trx_rseg_t* rseg;
+ trx_undo_t* undo;
+ trx_t* trx;
+
+ UT_LIST_INIT(trx_sys->trx_list);
+
+ /* Look from the rollback segments if there exist undo logs for
+ transactions */
+
+ rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);
+
+ while (rseg != NULL) {
+ undo = UT_LIST_GET_FIRST(rseg->insert_undo_list);
+
+ while (undo != NULL) {
+
+ trx = trx_create(NULL);
+
+ if (undo->state != TRX_UNDO_ACTIVE) {
+
+ trx->conc_state = TRX_COMMITTED_IN_MEMORY;
+ } else {
+ trx->conc_state = TRX_ACTIVE;
+ }
+
+ trx->id = undo->trx_id;
+ trx->insert_undo = undo;
+ trx->rseg = rseg;
+
+ if (undo->dict_operation) {
+ trx->dict_operation = undo->dict_operation;
+ trx->table_id = undo->table_id;
+ }
+
+ if (!undo->empty) {
+ trx->undo_no = ut_dulint_add(undo->top_undo_no,
+ 1);
+ }
+
+ trx_list_insert_ordered(trx);
+
+ undo = UT_LIST_GET_NEXT(undo_list, undo);
+ }
+
+ undo = UT_LIST_GET_FIRST(rseg->update_undo_list);
+
+ while (undo != NULL) {
+ trx = trx_get_on_id(undo->trx_id);
+
+ if (NULL == trx) {
+ trx = trx_create(NULL);
+
+ if (undo->state != TRX_UNDO_ACTIVE) {
+ trx->conc_state =
+ TRX_COMMITTED_IN_MEMORY;
+ } else {
+ trx->conc_state = TRX_ACTIVE;
+ }
+
+ trx->id = undo->trx_id;
+ trx->rseg = rseg;
+ trx_list_insert_ordered(trx);
+
+ if (undo->dict_operation) {
+ trx->dict_operation =
+ undo->dict_operation;
+ trx->table_id = undo->table_id;
+ }
+ }
+
+ trx->update_undo = undo;
+
+ if ((!undo->empty)
+ && (ut_dulint_cmp(undo->top_undo_no, trx->undo_no)
+ >= 0)) {
+
+ trx->undo_no = ut_dulint_add(undo->top_undo_no,
+ 1);
+ }
+
+ undo = UT_LIST_GET_NEXT(undo_list, undo);
+ }
+
+ rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
+ }
+}
+
+/**********************************************************************
+Assigns a rollback segment to a transaction in a round-robin fashion.
+Skips the SYSTEM rollback segment if another is available. */
+UNIV_INLINE
+ulint
+trx_assign_rseg(void)
+/*=================*/
+ /* out: assigned rollback segment id */
+{
+ trx_rseg_t* rseg = trx_sys->latest_rseg;
+
+ ut_ad(mutex_own(&kernel_mutex));
+loop:
+ /* Get next rseg in a round-robin fashion */
+
+ rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
+
+ if (rseg == NULL) {
+ rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);
+ }
+
+ /* If it is the SYSTEM rollback segment, and there exist others, skip
+ it */
+
+ if ((rseg->id == TRX_SYS_SYSTEM_RSEG_ID)
+ && (UT_LIST_GET_LEN(trx_sys->rseg_list) > 1)) {
+ goto loop;
+ }
+
+ trx_sys->latest_rseg = rseg;
+
+ return(rseg->id);
+}
+
+/********************************************************************
+Starts a new transaction. */
+
+ibool
+trx_start_low(
+/*==========*/
+ /* out: TRUE */
+ trx_t* trx, /* in: transaction */
+ ulint rseg_id)/* in: rollback segment id; if ULINT_UNDEFINED
+ is passed, the system chooses the rollback segment
+ automatically in a round-robin fashion */
+{
+ trx_rseg_t* rseg;
+
+ ut_ad(mutex_own(&kernel_mutex));
+ ut_ad(trx->rseg == NULL);
+
+ if (trx->type == TRX_PURGE) {
+ trx->id = ut_dulint_zero;
+ trx->conc_state = TRX_ACTIVE;
+
+ return(TRUE);
+ }
+
+ ut_ad(trx->conc_state != TRX_ACTIVE);
+
+ if (rseg_id == ULINT_UNDEFINED) {
+
+ rseg_id = trx_assign_rseg();
+ }
+
+ rseg = trx_sys_get_nth_rseg(trx_sys, rseg_id);
+
+ trx->id = trx_sys_get_new_trx_id();
+
+ /* The initial value for trx->no: ut_dulint_max is used in
+ read_view_open_now: */
+
+ trx->no = ut_dulint_max;
+
+ trx->rseg = rseg;
+
+ trx->conc_state = TRX_ACTIVE;
+
+ UT_LIST_ADD_FIRST(trx_list, trx_sys->trx_list, trx);
+
+ return(TRUE);
+}
+
+/********************************************************************
+Starts a new transaction. */
+
+ibool
+trx_start(
+/*======*/
+ /* out: TRUE */
+ trx_t* trx, /* in: transaction */
+ ulint rseg_id)/* in: rollback segment id; if ULINT_UNDEFINED
+ is passed, the system chooses the rollback segment
+ automatically in a round-robin fashion */
+{
+ ibool ret;
+
+ mutex_enter(&kernel_mutex);
+
+ ret = trx_start_low(trx, rseg_id);
+
+ mutex_exit(&kernel_mutex);
+
+ return(ret);
+}
+
+/********************************************************************
+Commits a transaction. */
+
+void
+trx_commit_off_kernel(
+/*==================*/
+ trx_t* trx) /* in: transaction */
+{
+ page_t* update_hdr_page;
+ dulint lsn;
+ trx_rseg_t* rseg;
+ trx_undo_t* undo;
+ ibool must_flush_log = FALSE;
+ mtr_t mtr;
+
+ ut_ad(mutex_own(&kernel_mutex));
+
+ rseg = trx->rseg;
+
+ if ((trx->insert_undo != NULL) || (trx->update_undo != NULL)) {
+
+ mutex_exit(&kernel_mutex);
+
+ mtr_start(&mtr);
+
+ must_flush_log = TRUE;
+
+ /* Change the undo log segment states from TRX_UNDO_ACTIVE
+ to some other state: these modifications to the file data
+ structure define the transaction as committed in the file
+ based world, at the serialization point of the log sequence
+ number lsn obtained below. */
+
+ mutex_enter(&(rseg->mutex));
+
+ if (trx->insert_undo != NULL) {
+ trx_undo_set_state_at_finish(trx, trx->insert_undo,
+ &mtr);
+ }
+
+ undo = trx->update_undo;
+
+ if (undo) {
+ mutex_enter(&kernel_mutex);
+#ifdef TRX_UPDATE_UNDO_OPT
+ if (!undo->del_marks && (undo->size == 1)
+ && (UT_LIST_GET_LEN(trx_sys->view_list) == 1)) {
+
+ /* There is no need to save the update undo
+ log: discard it; note that &mtr gets committed
+ while we must hold the kernel mutex and
+ therefore this optimization may add to the
+ contention of the kernel mutex. */
+
+ lsn = trx_undo_update_cleanup_by_discard(trx,
+ &mtr);
+ mutex_exit(&(rseg->mutex));
+
+ goto shortcut;
+ }
+#endif
+ trx->no = trx_sys_get_new_trx_no();
+
+ mutex_exit(&kernel_mutex);
+
+ /* It is not necessary to obtain trx->undo_mutex here
+ because only a single OS thread is allowed to do the
+ transaction commit for this transaction. */
+
+ update_hdr_page = trx_undo_set_state_at_finish(trx,
+ undo, &mtr);
+
+ /* We have to do the cleanup for the update log while
+ holding the rseg mutex because update log headers
+ have to be put to the history list in the order of
+ the trx number. */
+
+ trx_undo_update_cleanup(trx, update_hdr_page, &mtr);
+ }
+
+ mutex_exit(&(rseg->mutex));
+
+ /* If we did not take the shortcut, the following call
+ commits the mini-transaction, making the whole transaction
+ committed in the file-based world at this log sequence number;
+ otherwise, we get the commit lsn from the call of
+ trx_undo_update_cleanup_by_discard above.
+ NOTE that transaction numbers, which are assigned only to
+ transactions with an update undo log, do not necessarily come
+ in exactly the same order as commit lsn's, if the transactions
+ have different rollback segments. To get exactly the same
+ order we should hold the kernel mutex up to this point,
+ adding to to the contention of the kernel mutex. However, if
+ a transaction T2 is able to see modifications made by
+ a transaction T1, T2 will always get a bigger transaction
+ number and a bigger commit lsn than T1. */
+
+ /*--------------*/
+ mtr_commit(&mtr);
+ /*--------------*/
+ lsn = mtr.end_lsn;
+
+ mutex_enter(&kernel_mutex);
+ }
+#ifdef TRX_UPDATE_UNDO_OPT
+shortcut:
+#endif
+ ut_ad(trx->conc_state == TRX_ACTIVE);
+ ut_ad(mutex_own(&kernel_mutex));
+
+ /* The following assignment makes the transaction committed in memory
+ and makes its changes to data visible to other transactions.
+ NOTE that there is a small discrepancy from the strict formal
+ visibility rules here: a human user of the database can see
+ modifications made by another transaction T even before the necessary
+ log segment has been flushed to the disk. If the database happens to
+ crash before the flush, the user has seen modifications from T which
+ will never be a committed transaction. However, any transaction T2
+ which sees the modifications of the committing transaction T, and
+ which also itself makes modifications to the database, will get an lsn
+ larger than the committing transaction T. In the case where the log
+ flush fails, and T never gets committed, also T2 will never get
+ committed. */
+
+ /*--------------------------------------*/
+ trx->conc_state = TRX_COMMITTED_IN_MEMORY;
+ /*--------------------------------------*/
+
+ lock_release_off_kernel(trx);
+
+ if (trx->read_view) {
+ read_view_close(trx->read_view);
+
+ mem_heap_empty(trx->read_view_heap);
+ trx->read_view = NULL;
+ }
+
+/* printf("Trx %lu commit finished\n", ut_dulint_get_low(trx->id)); */
+
+ if (must_flush_log) {
+
+ mutex_exit(&kernel_mutex);
+
+ if (trx->insert_undo != NULL) {
+
+ trx_undo_insert_cleanup(trx);
+ }
+
+ /* NOTE that we could possibly make a group commit more
+ efficient here: call os_thread_yield here to allow also other
+ trxs to come to commit! */
+
+ /* We now flush the log, as the transaction made changes to
+ the database, making the transaction committed on disk. It is
+ enough that any one of the log groups gets written to disk. */
+
+ /*-------------------------------------*/
+
+ /* Only in some performance tests the variable srv_flush..
+ will be set to FALSE: */
+
+ if (srv_flush_log_at_trx_commit) {
+
+ log_flush_up_to(lsn, LOG_WAIT_ONE_GROUP);
+ }
+
+ /*-------------------------------------*/
+
+ mutex_enter(&kernel_mutex);
+ }
+
+ trx->conc_state = TRX_NOT_STARTED;
+ trx->rseg = NULL;
+ trx->undo_no = ut_dulint_zero;
+ trx->last_sql_stat_start.least_undo_no = ut_dulint_zero;
+
+ ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
+ ut_ad(UT_LIST_GET_LEN(trx->trx_locks) == 0);
+
+ UT_LIST_REMOVE(trx_list, trx_sys->trx_list, trx);
+}
+
+/************************************************************************
+Assigns a read view for a consistent read query. All the consistent reads
+within the same transaction will get the same read view, which is created
+when this function is first called for a new started transaction. */
+
+read_view_t*
+trx_assign_read_view(
+/*=================*/
+ /* out: consistent read view */
+ trx_t* trx) /* in: active transaction */
+{
+ ut_ad(trx->conc_state == TRX_ACTIVE);
+
+ if (trx->read_view) {
+ return(trx->read_view);
+ }
+
+ mutex_enter(&kernel_mutex);
+
+ if (!trx->read_view) {
+ trx->read_view = read_view_open_now(trx, trx->read_view_heap);
+ }
+
+ mutex_exit(&kernel_mutex);
+
+ return(trx->read_view);
+}
+
+/********************************************************************
+Commits a transaction. NOTE that the kernel mutex is temporarily released. */
+static
+void
+trx_handle_commit_sig_off_kernel(
+/*=============================*/
+ trx_t* trx, /* in: transaction */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
+{
+ trx_sig_t* sig;
+ trx_sig_t* next_sig;
+
+ ut_ad(mutex_own(&kernel_mutex));
+
+ trx->que_state = TRX_QUE_COMMITTING;
+
+ trx_commit_off_kernel(trx);
+
+ ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
+
+ /* Remove all TRX_SIG_COMMIT signals from the signal queue and send
+ reply messages to them */
+
+ sig = UT_LIST_GET_FIRST(trx->signals);
+
+ while (sig != NULL) {
+ next_sig = UT_LIST_GET_NEXT(signals, sig);
+
+ if (sig->type == TRX_SIG_COMMIT) {
+
+ trx_sig_reply(trx, sig, next_thr);
+ trx_sig_remove(trx, sig);
+ }
+
+ sig = next_sig;
+ }
+
+ trx->que_state = TRX_QUE_RUNNING;
+}
+
+/***************************************************************
+The transaction must be in the TRX_QUE_LOCK_WAIT state. Puts it to
+the TRX_QUE_RUNNING state and releases query threads which were
+waiting for a lock in the wait_thrs list. */
+
+void
+trx_end_lock_wait(
+/*==============*/
+ trx_t* trx) /* in: transaction */
+{
+ que_thr_t* thr;
+
+ ut_ad(mutex_own(&kernel_mutex));
+ ut_ad(trx->que_state == TRX_QUE_LOCK_WAIT);
+
+ thr = UT_LIST_GET_FIRST(trx->wait_thrs);
+
+ while (thr != NULL) {
+ que_thr_end_wait_no_next_thr(thr);
+
+ UT_LIST_REMOVE(trx_thrs, trx->wait_thrs, thr);
+
+ thr = UT_LIST_GET_FIRST(trx->wait_thrs);
+ }
+
+ trx->que_state = TRX_QUE_RUNNING;
+}
+
+/***************************************************************
+Moves the query threads in the lock wait list to the SUSPENDED state and puts
+the transaction to the TRX_QUE_RUNNING state. */
+static
+void
+trx_lock_wait_to_suspended(
+/*=======================*/
+ trx_t* trx) /* in: transaction in the TRX_QUE_LOCK_WAIT state */
+{
+ que_thr_t* thr;
+
+ ut_ad(mutex_own(&kernel_mutex));
+ ut_ad(trx->que_state == TRX_QUE_LOCK_WAIT);
+
+ thr = UT_LIST_GET_FIRST(trx->wait_thrs);
+
+ while (thr != NULL) {
+ thr->state = QUE_THR_SUSPENDED;
+
+ UT_LIST_REMOVE(trx_thrs, trx->wait_thrs, thr);
+
+ thr = UT_LIST_GET_FIRST(trx->wait_thrs);
+ }
+
+ trx->que_state = TRX_QUE_RUNNING;
+}
+
+/***************************************************************
+Moves the query threads in the sig reply wait list of trx to the SUSPENDED
+state. */
+static
+void
+trx_sig_reply_wait_to_suspended(
+/*============================*/
+ trx_t* trx) /* in: transaction */
+{
+ trx_sig_t* sig;
+ que_thr_t* thr;
+
+ ut_ad(mutex_own(&kernel_mutex));
+
+ sig = UT_LIST_GET_FIRST(trx->reply_signals);
+
+ while (sig != NULL) {
+ thr = sig->receiver;
+
+ ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT);
+
+ thr->state = QUE_THR_SUSPENDED;
+
+ sig->receiver = NULL;
+ sig->reply = FALSE;
+
+ UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig);
+
+ sig = UT_LIST_GET_FIRST(trx->reply_signals);
+ }
+}
+
+/*********************************************************************
+Checks the compatibility of a new signal with the other signals in the
+queue. */
+static
+ibool
+trx_sig_is_compatible(
+/*==================*/
+ /* out: TRUE if the signal can be queued */
+ trx_t* trx, /* in: trx handle */
+ ulint type, /* in: signal type */
+ ulint sender) /* in: TRX_SIG_SELF or TRX_SIG_OTHER_SESS */
+{
+ trx_sig_t* sig;
+
+ ut_ad(mutex_own(&kernel_mutex));
+
+ if (UT_LIST_GET_LEN(trx->signals) == 0) {
+
+ return(TRUE);
+ }
+
+ if (sender == TRX_SIG_SELF) {
+ if (type == TRX_SIG_ERROR_OCCURRED) {
+
+ return(TRUE);
+
+ } else if (type == TRX_SIG_BREAK_EXECUTION) {
+
+ return(TRUE);
+ } else {
+ return(FALSE);
+ }
+ }
+
+ ut_ad(sender == TRX_SIG_OTHER_SESS);
+
+ sig = UT_LIST_GET_FIRST(trx->signals);
+
+ if (type == TRX_SIG_COMMIT) {
+ while (sig != NULL) {
+
+ if (sig->type == TRX_SIG_TOTAL_ROLLBACK) {
+
+ return(FALSE);
+ }
+
+ sig = UT_LIST_GET_NEXT(signals, sig);
+ }
+
+ return(TRUE);
+
+ } else if (type == TRX_SIG_TOTAL_ROLLBACK) {
+ while (sig != NULL) {
+
+ if (sig->type == TRX_SIG_COMMIT) {
+
+ return(FALSE);
+ }
+
+ sig = UT_LIST_GET_NEXT(signals, sig);
+ }
+
+ return(TRUE);
+
+ } else if (type == TRX_SIG_BREAK_EXECUTION) {
+
+ return(TRUE);
+ } else {
+ ut_error;
+
+ return(FALSE);
+ }
+}
+
+/********************************************************************
+Sends a signal to a trx object. */
+
+ibool
+trx_sig_send(
+/*=========*/
+ /* out: TRUE if the signal was
+ successfully delivered */
+ trx_t* trx, /* in: trx handle */
+ ulint type, /* in: signal type */
+ ulint sender, /* in: TRX_SIG_SELF or
+ TRX_SIG_OTHER_SESS */
+ ibool reply, /* in: TRUE if the sender of the signal
+ wants reply after the operation induced
+ by the signal is completed; if type
+ is TRX_SIG_END_WAIT, this must be
+ FALSE */
+ que_thr_t* receiver_thr, /* in: query thread which wants the
+ reply, or NULL */
+ trx_savept_t* savept, /* in: possible rollback savepoint, or
+ NULL */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if the parameter
+ is NULL, it is ignored */
+{
+ trx_sig_t* sig;
+ trx_t* receiver_trx;
+
+ ut_ad(trx);
+ ut_ad(mutex_own(&kernel_mutex));
+
+ if (!trx_sig_is_compatible(trx, type, sender)) {
+ /* The signal is not compatible with the other signals in
+ the queue: do nothing */
+
+ ut_a(0);
+
+ /* sess_raise_error_low(trx, 0, 0, NULL, NULL, NULL, NULL,
+ "Incompatible signal"); */
+ return(FALSE);
+ }
+
+ /* Queue the signal object */
+
+ if (UT_LIST_GET_LEN(trx->signals) == 0) {
+
+ /* The signal list is empty: the 'sig' slot must be unused
+ (we improve performance a bit by avoiding mem_alloc) */
+ sig = &(trx->sig);
+ } else {
+ /* It might be that the 'sig' slot is unused also in this
+ case, but we choose the easy way of using mem_alloc */
+
+ sig = mem_alloc(sizeof(trx_sig_t));
+ }
+
+ UT_LIST_ADD_LAST(signals, trx->signals, sig);
+
+ sig->type = type;
+ sig->state = TRX_SIG_WAITING;
+ sig->sender = sender;
+ sig->reply = reply;
+ sig->receiver = receiver_thr;
+
+ if (savept) {
+ sig->savept = *savept;
+ }
+
+ if (receiver_thr) {
+ receiver_trx = thr_get_trx(receiver_thr);
+
+ UT_LIST_ADD_LAST(reply_signals, receiver_trx->reply_signals,
+ sig);
+ }
+
+ if (trx->sess->state == SESS_ERROR) {
+
+ trx_sig_reply_wait_to_suspended(trx);
+ }
+
+ if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) {
+
+ /* The following call will add a TRX_SIG_ERROR_OCCURRED
+ signal to the end of the queue, if the session is not yet
+ in the error state: */
+
+ ut_a(0);
+
+ sess_raise_error_low(trx, 0, 0, NULL, NULL, NULL, NULL,
+ "Signal from another session, or a break execution signal");
+ }
+
+ /* If there were no other signals ahead in the queue, try to start
+ handling of the signal */
+
+ if (UT_LIST_GET_FIRST(trx->signals) == sig) {
+
+ trx_sig_start_handle(trx, next_thr);
+ }
+
+ return(TRUE);
+}
+
+/********************************************************************
+Ends signal handling. If the session is in the error state, and
+trx->graph_before_signal_handling != NULL, then returns control to the error
+handling routine of the graph (currently just returns the control to the
+graph root which then will send an error message to the client). */
+
+void
+trx_end_signal_handling(
+/*====================*/
+ trx_t* trx) /* in: trx */
+{
+ ut_ad(mutex_own(&kernel_mutex));
+ ut_ad(trx->handling_signals == TRUE);
+
+ trx->handling_signals = FALSE;
+
+ trx->graph = trx->graph_before_signal_handling;
+
+ if (trx->graph && (trx->sess->state == SESS_ERROR)) {
+
+ que_fork_error_handle(trx, trx->graph);
+ }
+}
+
+/********************************************************************
+Starts handling of a trx signal. */
+
+void
+trx_sig_start_handle(
+/*=================*/
+ trx_t* trx, /* in: trx handle */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread; if the parameter
+ is NULL, it is ignored */
+{
+ trx_sig_t* sig;
+ ulint type;
+loop:
+ /* We loop in this function body as long as there are queued signals
+ we can process immediately */
+
+ ut_ad(trx);
+ ut_ad(mutex_own(&kernel_mutex));
+
+ if (trx->handling_signals && (UT_LIST_GET_LEN(trx->signals) == 0)) {
+
+ trx_end_signal_handling(trx);
+
+ return;
+ }
+
+ if (trx->conc_state == TRX_NOT_STARTED) {
+
+ trx_start_low(trx, ULINT_UNDEFINED);
+ }
+
+ /* If the trx is in a lock wait state, moves the waiting query threads
+ to the suspended state */
+
+ if (trx->que_state == TRX_QUE_LOCK_WAIT) {
+
+ trx_lock_wait_to_suspended(trx);
+ }
+
+ /* If the session is in the error state and this trx has threads
+ waiting for reply from signals, moves these threads to the suspended
+ state, canceling wait reservations; note that if the transaction has
+ sent a commit or rollback signal to itself, and its session is not in
+ the error state, then nothing is done here. */
+
+ if (trx->sess->state == SESS_ERROR) {
+ trx_sig_reply_wait_to_suspended(trx);
+ }
+
+ /* If there are no running query threads, we can start processing of a
+ signal, otherwise we have to wait until all query threads of this
+ transaction are aware of the arrival of the signal. */
+
+ if (trx->n_active_thrs > 0) {
+
+ return;
+ }
+
+ if (trx->handling_signals == FALSE) {
+ trx->graph_before_signal_handling = trx->graph;
+
+ trx->handling_signals = TRUE;
+ }
+
+ sig = UT_LIST_GET_FIRST(trx->signals);
+ type = sig->type;
+
+ if (type == TRX_SIG_COMMIT) {
+
+ trx_handle_commit_sig_off_kernel(trx, next_thr);
+
+ } else if ((type == TRX_SIG_TOTAL_ROLLBACK)
+ || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) {
+
+ trx_rollback(trx, sig, next_thr);
+
+ /* No further signals can be handled until the rollback
+ completes, therefore we return */
+
+ return;
+
+ } else if (type == TRX_SIG_ERROR_OCCURRED) {
+
+ trx_rollback(trx, sig, next_thr);
+
+ /* No further signals can be handled until the rollback
+ completes, therefore we return */
+
+ return;
+
+ } else if (type == TRX_SIG_BREAK_EXECUTION) {
+
+ trx_sig_reply(trx, sig, next_thr);
+ trx_sig_remove(trx, sig);
+ } else {
+ ut_error;
+ }
+
+ goto loop;
+}
+
+/********************************************************************
+Send the reply message when a signal in the queue of the trx has been
+handled. */
+
+void
+trx_sig_reply(
+/*==========*/
+ trx_t* trx, /* in: trx handle */
+ trx_sig_t* sig, /* in: signal */
+ que_thr_t** next_thr) /* in/out: next query thread to run;
+ if the value which is passed in is
+ a pointer to a NULL pointer, then the
+ calling function can start running
+ a new query thread */
+{
+ trx_t* receiver_trx;
+
+ ut_ad(trx && sig);
+ ut_ad(mutex_own(&kernel_mutex));
+
+ if (sig->reply && (sig->receiver != NULL)) {
+
+ ut_ad((sig->receiver)->state == QUE_THR_SIG_REPLY_WAIT);
+
+ receiver_trx = thr_get_trx(sig->receiver);
+
+ UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals,
+ sig);
+ ut_ad(receiver_trx->sess->state != SESS_ERROR);
+
+ que_thr_end_wait(sig->receiver, next_thr);
+
+ sig->reply = FALSE;
+ sig->receiver = NULL;
+
+ } else if (sig->reply) {
+ /* In this case the reply should be sent to the client of
+ the session of the transaction */
+
+ sig->reply = FALSE;
+ sig->receiver = NULL;
+
+ sess_srv_msg_send_simple(trx->sess, SESS_SRV_SUCCESS,
+ SESS_NOT_RELEASE_KERNEL);
+ }
+}
+
+/********************************************************************
+Removes a signal object from the trx signal queue. */
+
+void
+trx_sig_remove(
+/*===========*/
+ trx_t* trx, /* in: trx handle */
+ trx_sig_t* sig) /* in, own: signal */
+{
+ ut_ad(trx && sig);
+ ut_ad(mutex_own(&kernel_mutex));
+
+ ut_ad(sig->reply == FALSE);
+ ut_ad(sig->receiver == NULL);
+
+ UT_LIST_REMOVE(signals, trx->signals, sig);
+ sig->type = 0; /* reset the field to catch possible bugs */
+
+ if (sig != &(trx->sig)) {
+ mem_free(sig);
+ }
+}
+
+/*************************************************************************
+Creates a commit command node struct. */
+
+commit_node_t*
+commit_node_create(
+/*===============*/
+ /* out, own: commit node struct */
+ mem_heap_t* heap) /* in: mem heap where created */
+{
+ commit_node_t* node;
+
+ node = mem_heap_alloc(heap, sizeof(commit_node_t));
+ node->common.type = QUE_NODE_COMMIT;
+ node->state = COMMIT_NODE_SEND;
+
+ return(node);
+}
+
+/***************************************************************
+Performs an execution step for a commit type node in a query graph. */
+
+que_thr_t*
+trx_commit_step(
+/*============*/
+ /* out: query thread to run next, or NULL */
+ que_thr_t* thr) /* in: query thread */
+{
+ commit_node_t* node;
+ que_thr_t* next_thr;
+ ibool success;
+
+ node = thr->run_node;
+
+ ut_ad(que_node_get_type(node) == QUE_NODE_COMMIT);
+
+ if (thr->prev_node == que_node_get_parent(node)) {
+ node->state = COMMIT_NODE_SEND;
+ }
+
+ if (node->state == COMMIT_NODE_SEND) {
+ mutex_enter(&kernel_mutex);
+
+ node->state = COMMIT_NODE_WAIT;
+
+ next_thr = NULL;
+
+ thr->state = QUE_THR_SIG_REPLY_WAIT;
+
+ /* Send the commit signal to the transaction */
+
+ success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
+ TRX_SIG_SELF, TRUE, thr, NULL,
+ &next_thr);
+
+ mutex_exit(&kernel_mutex);
+
+ if (!success) {
+ /* Error in delivering the commit signal */
+ que_thr_handle_error(thr, DB_ERROR, NULL, 0);
+ }
+
+ return(next_thr);
+ }
+
+ ut_ad(node->state == COMMIT_NODE_WAIT);
+
+ node->state = COMMIT_NODE_SEND;
+
+ thr->run_node = que_node_get_parent(node);
+
+ return(thr);
+}
+
+/**************************************************************************
+Does the transaction commit for MySQL. */
+
+ulint
+trx_commit_for_mysql(
+/*=================*/
+ /* out: 0 or error number */
+ trx_t* trx) /* in: trx handle */
+{
+ /* Because we do not do the commit by sending an Innobase
+ sig to the transaction, we must here make sure that trx has been
+ started. */
+
+ trx_start_if_not_started(trx);
+
+ mutex_enter(&kernel_mutex);
+
+ trx_commit_off_kernel(trx);
+
+ mutex_exit(&kernel_mutex);
+
+ return(0);
+}
+
+/**************************************************************************
+Marks the latest SQL statement ended. */
+
+void
+trx_mark_sql_stat_end(
+/*==================*/
+ trx_t* trx) /* in: trx handle */
+{
+ trx_start_if_not_started(trx);
+
+ mutex_enter(&kernel_mutex);
+
+ trx->last_sql_stat_start.least_undo_no = trx->undo_no;
+
+ mutex_exit(&kernel_mutex);
+}