summaryrefslogtreecommitdiff
path: root/storage/innobase/sync/sync0arr.c
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/sync/sync0arr.c')
-rw-r--r--storage/innobase/sync/sync0arr.c674
1 files changed, 383 insertions, 291 deletions
diff --git a/storage/innobase/sync/sync0arr.c b/storage/innobase/sync/sync0arr.c
index 198ef49ca9f..1f3d6df0403 100644
--- a/storage/innobase/sync/sync0arr.c
+++ b/storage/innobase/sync/sync0arr.c
@@ -46,13 +46,22 @@ algorithm, because 10 000 events are created fast, but
until a resource is released. The suspending is implemented
using an operating system event semaphore. */
struct sync_cell_struct {
- void* wait_object; /* pointer to the object the
- thread is waiting for; if NULL
- the cell is free for use */
+ /* State of the cell. SC_WAKING_UP means
+ sync_array_struct->n_reserved has been decremented, but the thread
+ in this cell has not waken up yet. When it does, it will set the
+ state to SC_FREE. Note that this is done without the protection of
+ any mutex. */
+ enum { SC_FREE, SC_RESERVED, SC_WAKING_UP } state;
+
+ void* wait_object; /* pointer to the object the
+ thread is waiting for; this is not
+ reseted to NULL when a cell is
+ freed. */
+
mutex_t* old_wait_mutex; /* the latest wait mutex in cell */
rw_lock_t* old_wait_rw_lock;/* the latest wait rw-lock in cell */
- ulint request_type; /* lock type requested on the
- object */
+ ulint request_type; /* lock type requested on the
+ object */
const char* file; /* in debug version file where
requested */
ulint line; /* in debug version line where
@@ -63,17 +72,12 @@ struct sync_cell_struct {
called sync_array_event_wait
on this cell */
ibool event_set; /* TRUE if the event is set */
- os_event_t event; /* operating system event
- semaphore handle */
+ os_event_t event; /* operating system event
+ semaphore handle */
time_t reservation_time;/* time when the thread reserved
the wait cell */
};
-/* NOTE: It is allowed for a thread to wait
-for an event allocated for the array without owning the
-protecting mutex (depending on the case: OS or database mutex), but
-all changes (set or reset) to the state of the event must be made
-while owning the mutex. */
struct sync_array_struct {
ulint n_reserved; /* number of currently reserved
cells in the wait array */
@@ -106,8 +110,8 @@ ibool
sync_array_detect_deadlock(
/*=======================*/
/* out: TRUE if deadlock detected */
- sync_array_t* arr, /* in: wait array; NOTE! the caller must
- own the mutex to array */
+ sync_array_t* arr, /* in: wait array; NOTE! the caller must
+ own the mutex to array */
sync_cell_t* start, /* in: cell where recursive search started */
sync_cell_t* cell, /* in: cell to search */
ulint depth); /* in: recursion depth */
@@ -190,7 +194,7 @@ sync_array_create(
sync_cell_t* cell_array;
sync_cell_t* cell;
ulint i;
-
+
ut_a(n_cells > 0);
/* Allocate memory for the data structures */
@@ -205,7 +209,7 @@ sync_array_create(
arr->sg_count = 0;
arr->res_count = 0;
- /* Then create the mutex to protect the wait array complex */
+ /* Then create the mutex to protect the wait array complex */
if (protection == SYNC_ARRAY_OS_MUTEX) {
arr->os_mutex = os_mutex_create(NULL);
} else if (protection == SYNC_ARRAY_MUTEX) {
@@ -215,12 +219,13 @@ sync_array_create(
ut_error;
}
- for (i = 0; i < n_cells; i++) {
- cell = sync_array_get_nth_cell(arr, i);
- cell->wait_object = NULL;
+ for (i = 0; i < n_cells; i++) {
+ cell = sync_array_get_nth_cell(arr, i);
+ cell->state = SC_FREE;
+ cell->wait_object = NULL;
- /* Create an operating system event semaphore with no name */
- cell->event = os_event_create(NULL);
+ /* Create an operating system event semaphore with no name */
+ cell->event = os_event_create(NULL);
cell->event_set = FALSE; /* it is created in reset state */
}
@@ -235,22 +240,22 @@ sync_array_free(
/*============*/
sync_array_t* arr) /* in, own: sync wait array */
{
- ulint i;
- sync_cell_t* cell;
+ ulint i;
+ sync_cell_t* cell;
ulint protection;
- ut_a(arr->n_reserved == 0);
-
+ ut_a(arr->n_reserved == 0);
+
sync_array_validate(arr);
-
- for (i = 0; i < arr->n_cells; i++) {
- cell = sync_array_get_nth_cell(arr, i);
+
+ for (i = 0; i < arr->n_cells; i++) {
+ cell = sync_array_get_nth_cell(arr, i);
os_event_free(cell->event);
- }
+ }
protection = arr->protection;
- /* Release the mutex protecting the wait array complex */
+ /* Release the mutex protecting the wait array complex */
if (protection == SYNC_ARRAY_OS_MUTEX) {
os_mutex_free(arr->os_mutex);
@@ -261,7 +266,7 @@ sync_array_free(
}
ut_free(arr->array);
- ut_free(arr);
+ ut_free(arr);
}
/************************************************************************
@@ -273,48 +278,24 @@ sync_array_validate(
/*================*/
sync_array_t* arr) /* in: sync wait array */
{
- ulint i;
- sync_cell_t* cell;
- ulint count = 0;
-
- sync_array_enter(arr);
-
- for (i = 0; i < arr->n_cells; i++) {
- cell = sync_array_get_nth_cell(arr, i);
+ ulint i;
+ sync_cell_t* cell;
+ ulint count = 0;
- if (cell->wait_object != NULL) {
- count++;
- }
- }
+ sync_array_enter(arr);
- ut_a(count == arr->n_reserved);
+ for (i = 0; i < arr->n_cells; i++) {
+ cell = sync_array_get_nth_cell(arr, i);
- sync_array_exit(arr);
-}
+ if (cell->state == SC_RESERVED) {
+ count++;
+ }
+ }
-/***********************************************************************
-Puts the cell event in set state. */
-static
-void
-sync_cell_event_set(
-/*================*/
- sync_cell_t* cell) /* in: array cell */
-{
- os_event_set(cell->event);
- cell->event_set = TRUE;
-}
+ ut_a(count == arr->n_reserved);
-/***********************************************************************
-Puts the cell event in reset state. */
-static
-void
-sync_cell_event_reset(
-/*==================*/
- sync_cell_t* cell) /* in: array cell */
-{
- os_event_reset(cell->event);
- cell->event_set = FALSE;
-}
+ sync_array_exit(arr);
+}
/**********************************************************************
Reserves a wait array cell for waiting for an object.
@@ -323,34 +304,39 @@ The event of the cell is reset to nonsignalled state. */
void
sync_array_reserve_cell(
/*====================*/
- sync_array_t* arr, /* in: wait array */
- void* object, /* in: pointer to the object to wait for */
- ulint type, /* in: lock request type */
+ sync_array_t* arr, /* in: wait array */
+ void* object, /* in: pointer to the object to wait for */
+ ulint type, /* in: lock request type */
const char* file, /* in: file where requested */
- ulint line, /* in: line where requested */
- ulint* index) /* out: index of the reserved cell */
+ ulint line, /* in: line where requested */
+ ulint* index) /* out: index of the reserved cell */
{
- sync_cell_t* cell;
- ulint i;
-
- ut_a(object);
- ut_a(index);
+ sync_cell_t* cell;
+ ulint i;
- sync_array_enter(arr);
+ ut_a(object);
+ ut_a(index);
- arr->res_count++;
+ sync_array_enter(arr);
+
+ arr->res_count++;
/* Reserve a new cell. */
- for (i = 0; i < arr->n_cells; i++) {
- cell = sync_array_get_nth_cell(arr, i);
+ for (i = 0; i < arr->n_cells; i++) {
+ cell = sync_array_get_nth_cell(arr, i);
- if (cell->wait_object == NULL) {
+ if (cell->state == SC_FREE) {
- /* Make sure the event is reset */
- if (cell->event_set) {
- sync_cell_event_reset(cell);
- }
+ /* We do not check cell->event_set because it is
+ set outside the protection of the sync array mutex
+ and we had a bug regarding it, and since resetting
+ an event when it is not needed does no harm it is
+ safer always to do it. */
+ cell->event_set = FALSE;
+ os_event_reset(cell->event);
+
+ cell->state = SC_RESERVED;
cell->reservation_time = time(NULL);
cell->thread = os_thread_get_curr_id();
@@ -361,29 +347,91 @@ sync_array_reserve_cell(
} else {
cell->old_wait_rw_lock = object;
}
-
+
cell->request_type = type;
cell->waiting = FALSE;
-
+
cell->file = file;
cell->line = line;
-
+
arr->n_reserved++;
*index = i;
sync_array_exit(arr);
-
- return;
- }
- }
- ut_error; /* No free cell found */
+ return;
+ }
+ }
+
+ ut_error; /* No free cell found */
return;
}
/**********************************************************************
+Frees the cell. Note that we don't have any mutex reserved when calling
+this. */
+static
+void
+sync_array_free_cell(
+/*=================*/
+ sync_array_t* arr, /* in: wait array */
+ ulint index) /* in: index of the cell in array */
+{
+ sync_cell_t* cell;
+
+ cell = sync_array_get_nth_cell(arr, index);
+
+ ut_a(cell->state == SC_WAKING_UP);
+ ut_a(cell->wait_object != NULL);
+
+ cell->state = SC_FREE;
+}
+
+/**********************************************************************
+Frees the cell safely by reserving the sync array mutex and decrementing
+n_reserved if necessary. Should only be called from mutex_spin_wait. */
+
+void
+sync_array_free_cell_protected(
+/*===========================*/
+ sync_array_t* arr, /* in: wait array */
+ ulint index) /* in: index of the cell in array */
+{
+ sync_cell_t* cell;
+
+ sync_array_enter(arr);
+
+ cell = sync_array_get_nth_cell(arr, index);
+
+ ut_a(cell->state != SC_FREE);
+ ut_a(cell->wait_object != NULL);
+
+ /* We only need to decrement n_reserved if it has not already been
+ done by sync_array_signal_object. */
+ if (cell->state == SC_RESERVED) {
+ ut_a(arr->n_reserved > 0);
+ arr->n_reserved--;
+ } else if (cell->state == SC_WAKING_UP) {
+ /* This is tricky; if we don't wait for the event to be
+ signaled, signal_object can set the state of a cell to
+ SC_WAKING_UP, mutex_spin_wait can call this and set the
+ state to SC_FREE, and then signal_object gets around to
+ calling os_set_event for the cell but since it's already
+ been freed things break horribly. */
+
+ sync_array_exit(arr);
+ os_event_wait(cell->event);
+ sync_array_enter(arr);
+ }
+
+ cell->state = SC_FREE;
+
+ sync_array_exit(arr);
+}
+
+/**********************************************************************
This function should be called when a thread starts to wait on
a wait array cell. In the debug version this function checks
if the wait for a semaphore will result in a deadlock, in which
@@ -392,51 +440,51 @@ case prints info and asserts. */
void
sync_array_wait_event(
/*==================*/
- sync_array_t* arr, /* in: wait array */
- ulint index) /* in: index of the reserved cell */
+ sync_array_t* arr, /* in: wait array */
+ ulint index) /* in: index of the reserved cell */
{
- sync_cell_t* cell;
- os_event_t event;
-
- ut_a(arr);
+ sync_cell_t* cell;
+ os_event_t event;
- sync_array_enter(arr);
+ ut_a(arr);
- cell = sync_array_get_nth_cell(arr, index);
+ cell = sync_array_get_nth_cell(arr, index);
+ ut_a((cell->state == SC_RESERVED) || (cell->state == SC_WAKING_UP));
ut_a(cell->wait_object);
ut_a(!cell->waiting);
ut_ad(os_thread_get_curr_id() == cell->thread);
- event = cell->event;
- cell->waiting = TRUE;
+ event = cell->event;
+ cell->waiting = TRUE;
#ifdef UNIV_SYNC_DEBUG
-
+
/* We use simple enter to the mutex below, because if
we cannot acquire it at once, mutex_enter would call
recursively sync_array routines, leading to trouble.
rw_lock_debug_mutex freezes the debug lists. */
+ sync_array_enter(arr);
rw_lock_debug_mutex_enter();
if (TRUE == sync_array_detect_deadlock(arr, cell, cell, 0)) {
fputs("########################################\n", stderr);
ut_error;
- }
+ }
rw_lock_debug_mutex_exit();
+ sync_array_exit(arr);
#endif
- sync_array_exit(arr);
+ os_event_wait(event);
- os_event_wait(event);
-
- sync_array_free_cell(arr, index);
+ sync_array_free_cell(arr, index);
}
/**********************************************************************
-Reports info of a wait array cell. */
+Reports info of a wait array cell. Note: sync_array_print_long_waits()
+calls this without mutex protection. */
static
void
sync_array_cell_print(
@@ -452,11 +500,20 @@ sync_array_cell_print(
fprintf(file,
"--Thread %lu has waited at %s line %lu for %.2f seconds the semaphore:\n",
- (ulong) os_thread_pf(cell->thread), cell->file,
- (ulong) cell->line,
- difftime(time(NULL), cell->reservation_time));
+ (ulong) os_thread_pf(cell->thread), cell->file,
+ (ulong) cell->line,
+ difftime(time(NULL), cell->reservation_time));
+ fprintf(file, "Wait array cell state %lu\n", (ulong)cell->state);
+
+ /* If the memory area pointed to by old_wait_mutex /
+ old_wait_rw_lock has been freed, this can crash. */
+
+ if (cell->state != SC_RESERVED) {
+ /* If cell has this state, then even if we are holding the sync
+ array mutex, the wait object may get freed meanwhile. Do not
+ print the wait object then. */
- if (type == SYNC_MUTEX) {
+ } else if (type == SYNC_MUTEX) {
/* We use old_wait_mutex in case the cell has already
been freed meanwhile */
mutex = cell->old_wait_mutex;
@@ -492,7 +549,7 @@ sync_array_cell_print(
? " exclusive\n"
: " wait exclusive\n");
}
-
+
fprintf(file,
"number of readers %lu, waiters flag %lu\n"
"Last time read locked in file %s line %lu\n"
@@ -507,11 +564,7 @@ sync_array_cell_print(
ut_error;
}
- if (!cell->waiting) {
- fputs("wait has ended\n", file);
- }
-
- if (cell->event_set) {
+ if (cell->event_set) {
fputs("wait is ending\n", file);
}
}
@@ -525,22 +578,22 @@ sync_array_find_thread(
/*===================*/
/* out: pointer to cell or NULL
if not found */
- sync_array_t* arr, /* in: wait array */
+ sync_array_t* arr, /* in: wait array */
os_thread_id_t thread) /* in: thread id */
{
- ulint i;
- sync_cell_t* cell;
+ ulint i;
+ sync_cell_t* cell;
- for (i = 0; i < arr->n_cells; i++) {
+ for (i = 0; i < arr->n_cells; i++) {
- cell = sync_array_get_nth_cell(arr, i);
+ cell = sync_array_get_nth_cell(arr, i);
- if (cell->wait_object != NULL
- && os_thread_eq(cell->thread, thread)) {
+ if ((cell->state == SC_RESERVED)
+ && os_thread_eq(cell->thread, thread)) {
- return(cell); /* Found */
- }
- }
+ return(cell); /* Found */
+ }
+ }
return(NULL); /* Not found */
}
@@ -552,8 +605,8 @@ ibool
sync_array_deadlock_step(
/*=====================*/
/* out: TRUE if deadlock detected */
- sync_array_t* arr, /* in: wait array; NOTE! the caller must
- own the mutex to array */
+ sync_array_t* arr, /* in: wait array; NOTE! the caller must
+ own the mutex to array */
sync_cell_t* start, /* in: cell where recursive search
started */
os_thread_id_t thread, /* in: thread to look at */
@@ -572,7 +625,7 @@ sync_array_deadlock_step(
return(FALSE);
}
-
+
new = sync_array_find_thread(arr, thread);
if (new == start) {
@@ -604,8 +657,8 @@ ibool
sync_array_detect_deadlock(
/*=======================*/
/* out: TRUE if deadlock detected */
- sync_array_t* arr, /* in: wait array; NOTE! the caller must
- own the mutex to array */
+ sync_array_t* arr, /* in: wait array; NOTE! the caller must
+ own the mutex to array */
sync_cell_t* start, /* in: cell where recursive search started */
sync_cell_t* cell, /* in: cell to search */
ulint depth) /* in: recursion depth */
@@ -615,19 +668,20 @@ sync_array_detect_deadlock(
os_thread_id_t thread;
ibool ret;
rw_lock_debug_t*debug;
-
- ut_a(arr && start && cell);
+
+ ut_a(arr && start && cell);
+ ut_ad(cell->state == SC_RESERVED);
ut_ad(cell->wait_object);
ut_ad(os_thread_get_curr_id() == start->thread);
ut_ad(depth < 100);
-
+
depth++;
-
+
if (cell->event_set || !cell->waiting) {
return(FALSE); /* No deadlock here */
}
-
+
if (cell->request_type == SYNC_MUTEX) {
mutex = cell->wait_object;
@@ -649,7 +703,8 @@ sync_array_detect_deadlock(
if (ret) {
fprintf(stderr,
"Mutex %p owned by thread %lu file %s line %lu\n",
- mutex, (ulong) os_thread_pf(mutex->thread_id),
+ mutex,
+ (ulong) os_thread_pf(mutex->thread_id),
mutex->file_name, (ulong) mutex->line);
sync_array_cell_print(stderr, cell);
@@ -661,76 +716,76 @@ sync_array_detect_deadlock(
} else if (cell->request_type == RW_LOCK_EX) {
- lock = cell->wait_object;
+ lock = cell->wait_object;
- debug = UT_LIST_GET_FIRST(lock->debug_list);
+ debug = UT_LIST_GET_FIRST(lock->debug_list);
- while (debug != NULL) {
+ while (debug != NULL) {
- thread = debug->thread_id;
+ thread = debug->thread_id;
- if (((debug->lock_type == RW_LOCK_EX)
- && !os_thread_eq(thread, cell->thread))
- || ((debug->lock_type == RW_LOCK_WAIT_EX)
- && !os_thread_eq(thread, cell->thread))
- || (debug->lock_type == RW_LOCK_SHARED)) {
+ if (((debug->lock_type == RW_LOCK_EX)
+ && !os_thread_eq(thread, cell->thread))
+ || ((debug->lock_type == RW_LOCK_WAIT_EX)
+ && !os_thread_eq(thread, cell->thread))
+ || (debug->lock_type == RW_LOCK_SHARED)) {
- /* The (wait) x-lock request can block infinitely
- only if someone (can be also cell thread) is holding
- s-lock, or someone (cannot be cell thread) (wait)
- x-lock, and he is blocked by start thread */
+ /* The (wait) x-lock request can block
+ infinitely only if someone (can be also cell
+ thread) is holding s-lock, or someone
+ (cannot be cell thread) (wait) x-lock, and
+ he is blocked by start thread */
- ret = sync_array_deadlock_step(arr, start, thread,
- debug->pass,
- depth);
- if (ret) {
- print:
- fprintf(stderr, "rw-lock %p ", lock);
- sync_array_cell_print(stderr, cell);
- rw_lock_debug_print(debug);
- return(TRUE);
+ ret = sync_array_deadlock_step(arr, start,
+ thread, debug->pass, depth);
+ if (ret) {
+ print:
+ fprintf(stderr, "rw-lock %p ", lock);
+ sync_array_cell_print(stderr, cell);
+ rw_lock_debug_print(debug);
+ return(TRUE);
+ }
}
- }
- debug = UT_LIST_GET_NEXT(list, debug);
- }
+ debug = UT_LIST_GET_NEXT(list, debug);
+ }
- return(FALSE);
+ return(FALSE);
} else if (cell->request_type == RW_LOCK_SHARED) {
- lock = cell->wait_object;
- debug = UT_LIST_GET_FIRST(lock->debug_list);
+ lock = cell->wait_object;
+ debug = UT_LIST_GET_FIRST(lock->debug_list);
- while (debug != NULL) {
+ while (debug != NULL) {
- thread = debug->thread_id;
+ thread = debug->thread_id;
- if ((debug->lock_type == RW_LOCK_EX)
- || (debug->lock_type == RW_LOCK_WAIT_EX)) {
+ if ((debug->lock_type == RW_LOCK_EX)
+ || (debug->lock_type == RW_LOCK_WAIT_EX)) {
- /* The s-lock request can block infinitely only if
- someone (can also be cell thread) is holding (wait)
- x-lock, and he is blocked by start thread */
+ /* The s-lock request can block infinitely
+ only if someone (can also be cell thread) is
+ holding (wait) x-lock, and he is blocked by
+ start thread */
- ret = sync_array_deadlock_step(arr, start, thread,
- debug->pass,
- depth);
- if (ret) {
- goto print;
+ ret = sync_array_deadlock_step(arr, start,
+ thread, debug->pass, depth);
+ if (ret) {
+ goto print;
+ }
}
- }
- debug = UT_LIST_GET_NEXT(list, debug);
- }
+ debug = UT_LIST_GET_NEXT(list, debug);
+ }
- return(FALSE);
+ return(FALSE);
} else {
ut_error;
}
- return(TRUE); /* Execution never reaches this line: for compiler
+ return(TRUE); /* Execution never reaches this line: for compiler
fooling only */
}
#endif /* UNIV_SYNC_DEBUG */
@@ -745,7 +800,7 @@ sync_arr_cell_can_wake_up(
{
mutex_t* mutex;
rw_lock_t* lock;
-
+
if (cell->request_type == SYNC_MUTEX) {
mutex = cell->wait_object;
@@ -757,26 +812,26 @@ sync_arr_cell_can_wake_up(
} else if (cell->request_type == RW_LOCK_EX) {
- lock = cell->wait_object;
+ lock = cell->wait_object;
- if (rw_lock_get_reader_count(lock) == 0
- && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ if (rw_lock_get_reader_count(lock) == 0
+ && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
return(TRUE);
}
- if (rw_lock_get_reader_count(lock) == 0
- && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX
- && os_thread_eq(lock->writer_thread, cell->thread)) {
+ if (rw_lock_get_reader_count(lock) == 0
+ && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX
+ && os_thread_eq(lock->writer_thread, cell->thread)) {
return(TRUE);
}
} else if (cell->request_type == RW_LOCK_SHARED) {
- lock = cell->wait_object;
+ lock = cell->wait_object;
if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
-
+
return(TRUE);
}
}
@@ -784,32 +839,6 @@ sync_arr_cell_can_wake_up(
return(FALSE);
}
-/**********************************************************************
-Frees the cell. NOTE! sync_array_wait_event frees the cell
-automatically! */
-
-void
-sync_array_free_cell(
-/*=================*/
- sync_array_t* arr, /* in: wait array */
- ulint index) /* in: index of the cell in array */
-{
- sync_cell_t* cell;
-
- sync_array_enter(arr);
-
- cell = sync_array_get_nth_cell(arr, index);
-
- ut_a(cell->wait_object != NULL);
-
- cell->wait_object = NULL;
-
- ut_a(arr->n_reserved > 0);
- arr->n_reserved--;
-
- sync_array_exit(arr);
-}
-
/**************************************************************************
Looks for the cells in the wait array which refer to the wait object
specified, and sets their corresponding events to the signaled state. In this
@@ -822,73 +851,142 @@ sync_array_signal_object(
sync_array_t* arr, /* in: wait array */
void* object) /* in: wait object */
{
- sync_cell_t* cell;
- ulint count;
- ulint i;
+ sync_cell_t* cell;
+ ulint count;
+ ulint i;
+ ulint res_count;
+
+ /* We store the addresses of cells we need to signal and signal
+ them only after we have released the sync array's mutex (for
+ performance reasons). cell_count is the number of such cells, and
+ cell_ptr points to the first one. If there are less than
+ UT_ARR_SIZE(cells) of them, cell_ptr == &cells[0], otherwise
+ cell_ptr points to malloc'd memory that we must free. */
+
+ sync_cell_t* cells[100];
+ sync_cell_t** cell_ptr = &cells[0];
+ ulint cell_count = 0;
+ ulint cell_max_count = UT_ARR_SIZE(cells);
- sync_array_enter(arr);
+ ut_a(100 == cell_max_count);
+
+ sync_array_enter(arr);
arr->sg_count++;
i = 0;
count = 0;
- while (count < arr->n_reserved) {
+ /* We need to store this to a local variable because it is modified
+ inside the loop */
+ res_count = arr->n_reserved;
+
+ while (count < res_count) {
+
+ cell = sync_array_get_nth_cell(arr, i);
+
+ if (cell->state == SC_RESERVED) {
+
+ count++;
+ if (cell->wait_object == object) {
+ cell->state = SC_WAKING_UP;
- cell = sync_array_get_nth_cell(arr, i);
+ ut_a(arr->n_reserved > 0);
+ arr->n_reserved--;
- if (cell->wait_object != NULL) {
+ if (cell_count == cell_max_count) {
+ sync_cell_t** old_cell_ptr = cell_ptr;
+ size_t old_size, new_size;
- count++;
- if (cell->wait_object == object) {
+ old_size = cell_max_count *
+ sizeof(sync_cell_t*);
+ cell_max_count *= 2;
+ new_size = cell_max_count *
+ sizeof(sync_cell_t*);
- sync_cell_event_set(cell);
- }
- }
+ cell_ptr = malloc(new_size);
+ ut_a(cell_ptr);
+ memcpy(cell_ptr, old_cell_ptr,
+ old_size);
- i++;
- }
+ if (old_cell_ptr != &cells[0]) {
+ free(old_cell_ptr);
+ }
+ }
+
+ cell_ptr[cell_count] = cell;
+ cell_count++;
+ }
+ }
+
+ i++;
+ }
- sync_array_exit(arr);
+ sync_array_exit(arr);
+
+ for (i = 0; i < cell_count; i++) {
+ cell = cell_ptr[i];
+
+ cell->event_set = TRUE;
+ os_event_set(cell->event);
+ }
+
+ if (cell_ptr != &cells[0]) {
+ free(cell_ptr);
+ }
}
/**************************************************************************
If the wakeup algorithm does not work perfectly at semaphore relases,
this function will do the waking (see the comment in mutex_exit). This
-function should be called about every 1 second in the server. */
+function should be called about every 1 second in the server.
+
+Note that there's a race condition between this thread and mutex_exit
+changing the lock_word and calling signal_object, so sometimes this finds
+threads to wake up even when nothing has gone wrong. */
void
sync_arr_wake_threads_if_sema_free(void)
/*====================================*/
{
- sync_array_t* arr = sync_primary_wait_array;
- sync_cell_t* cell;
- ulint count;
- ulint i;
+ sync_array_t* arr = sync_primary_wait_array;
+ sync_cell_t* cell;
+ ulint count;
+ ulint i;
+ ulint res_count;
- sync_array_enter(arr);
+ sync_array_enter(arr);
i = 0;
count = 0;
- while (count < arr->n_reserved) {
+ /* We need to store this to a local variable because it is modified
+ inside the loop */
- cell = sync_array_get_nth_cell(arr, i);
+ res_count = arr->n_reserved;
- if (cell->wait_object != NULL) {
+ while (count < res_count) {
- count++;
+ cell = sync_array_get_nth_cell(arr, i);
- if (sync_arr_cell_can_wake_up(cell)) {
+ if (cell->state == SC_RESERVED) {
- sync_cell_event_set(cell);
- }
- }
+ count++;
- i++;
- }
+ if (sync_arr_cell_can_wake_up(cell)) {
+ cell->state = SC_WAKING_UP;
+ cell->event_set = TRUE;
+ os_event_set(cell->event);
- sync_array_exit(arr);
+ ut_a(arr->n_reserved > 0);
+ arr->n_reserved--;
+ }
+ }
+
+ i++;
+ }
+
+ sync_array_exit(arr);
}
/**************************************************************************
@@ -900,36 +998,36 @@ sync_array_print_long_waits(void)
/* out: TRUE if fatal semaphore wait threshold
was exceeded */
{
- sync_cell_t* cell;
- ibool old_val;
+ sync_cell_t* cell;
+ ibool old_val;
ibool noticed = FALSE;
- ulint i;
+ ulint i;
ulint fatal_timeout = srv_fatal_semaphore_wait_threshold;
ibool fatal = FALSE;
- for (i = 0; i < sync_primary_wait_array->n_cells; i++) {
+ for (i = 0; i < sync_primary_wait_array->n_cells; i++) {
- cell = sync_array_get_nth_cell(sync_primary_wait_array, i);
+ cell = sync_array_get_nth_cell(sync_primary_wait_array, i);
- if (cell->wait_object != NULL
- && difftime(time(NULL), cell->reservation_time) > 240) {
+ if ((cell->state != SC_FREE)
+ && difftime(time(NULL), cell->reservation_time) > 240) {
fputs("InnoDB: Warning: a long semaphore wait:\n",
stderr);
sync_array_cell_print(stderr, cell);
noticed = TRUE;
- }
+ }
- if (cell->wait_object != NULL
- && difftime(time(NULL), cell->reservation_time)
- > fatal_timeout) {
+ if ((cell->state != SC_FREE)
+ && difftime(time(NULL), cell->reservation_time)
+ > fatal_timeout) {
fatal = TRUE;
- }
- }
+ }
+ }
if (noticed) {
fprintf(stderr,
"InnoDB: ###### Starts InnoDB Monitor for 30 secs to print diagnostic info:\n");
- old_val = srv_print_innodb_monitor;
+ old_val = srv_print_innodb_monitor;
/* If some crucial semaphore is reserved, then also the InnoDB
Monitor can hang, and we do not get diagnostics. Since in
@@ -941,12 +1039,12 @@ sync_array_print_long_waits(void)
"InnoDB: Pending preads %lu, pwrites %lu\n", (ulong)os_file_n_pending_preads,
(ulong)os_file_n_pending_pwrites);
- srv_print_innodb_monitor = TRUE;
+ srv_print_innodb_monitor = TRUE;
os_event_set(srv_lock_timeout_thread_event);
- os_thread_sleep(30000000);
+ os_thread_sleep(30000000);
- srv_print_innodb_monitor = old_val;
+ srv_print_innodb_monitor = old_val;
fprintf(stderr,
"InnoDB: ###### Diagnostic info printed to the standard error stream\n");
}
@@ -964,27 +1062,21 @@ sync_array_output_info(
sync_array_t* arr) /* in: wait array; NOTE! caller must own the
mutex */
{
- sync_cell_t* cell;
- ulint count;
- ulint i;
+ sync_cell_t* cell;
+ ulint i;
fprintf(file,
- "OS WAIT ARRAY INFO: reservation count %ld, signal count %ld\n",
- (long) arr->res_count, (long) arr->sg_count);
- i = 0;
- count = 0;
-
- while (count < arr->n_reserved) {
+ "OS WAIT ARRAY INFO: reservation count %ld, signal count %ld\n",
+ (long) arr->res_count,
+ (long) arr->sg_count);
+ for (i = 0; i < arr->n_cells; i++) {
- cell = sync_array_get_nth_cell(arr, i);
+ cell = sync_array_get_nth_cell(arr, i);
- if (cell->wait_object != NULL) {
- count++;
+ if (cell->state != SC_FREE) {
sync_array_cell_print(file, cell);
- }
-
- i++;
- }
+ }
+ }
}
/**************************************************************************
@@ -996,10 +1088,10 @@ sync_array_print_info(
FILE* file, /* in: file where to print */
sync_array_t* arr) /* in: wait array */
{
- sync_array_enter(arr);
+ sync_array_enter(arr);
sync_array_output_info(file, arr);
-
- sync_array_exit(arr);
+
+ sync_array_exit(arr);
}