diff options
Diffstat (limited to 'storage/innobase/sync/sync0arr.c')
-rw-r--r-- | storage/innobase/sync/sync0arr.c | 674 |
1 files changed, 383 insertions, 291 deletions
diff --git a/storage/innobase/sync/sync0arr.c b/storage/innobase/sync/sync0arr.c index 198ef49ca9f..1f3d6df0403 100644 --- a/storage/innobase/sync/sync0arr.c +++ b/storage/innobase/sync/sync0arr.c @@ -46,13 +46,22 @@ algorithm, because 10 000 events are created fast, but until a resource is released. The suspending is implemented using an operating system event semaphore. */ struct sync_cell_struct { - void* wait_object; /* pointer to the object the - thread is waiting for; if NULL - the cell is free for use */ + /* State of the cell. SC_WAKING_UP means + sync_array_struct->n_reserved has been decremented, but the thread + in this cell has not waken up yet. When it does, it will set the + state to SC_FREE. Note that this is done without the protection of + any mutex. */ + enum { SC_FREE, SC_RESERVED, SC_WAKING_UP } state; + + void* wait_object; /* pointer to the object the + thread is waiting for; this is not + reseted to NULL when a cell is + freed. */ + mutex_t* old_wait_mutex; /* the latest wait mutex in cell */ rw_lock_t* old_wait_rw_lock;/* the latest wait rw-lock in cell */ - ulint request_type; /* lock type requested on the - object */ + ulint request_type; /* lock type requested on the + object */ const char* file; /* in debug version file where requested */ ulint line; /* in debug version line where @@ -63,17 +72,12 @@ struct sync_cell_struct { called sync_array_event_wait on this cell */ ibool event_set; /* TRUE if the event is set */ - os_event_t event; /* operating system event - semaphore handle */ + os_event_t event; /* operating system event + semaphore handle */ time_t reservation_time;/* time when the thread reserved the wait cell */ }; -/* NOTE: It is allowed for a thread to wait -for an event allocated for the array without owning the -protecting mutex (depending on the case: OS or database mutex), but -all changes (set or reset) to the state of the event must be made -while owning the mutex. */ struct sync_array_struct { ulint n_reserved; /* number of currently reserved cells in the wait array */ @@ -106,8 +110,8 @@ ibool sync_array_detect_deadlock( /*=======================*/ /* out: TRUE if deadlock detected */ - sync_array_t* arr, /* in: wait array; NOTE! the caller must - own the mutex to array */ + sync_array_t* arr, /* in: wait array; NOTE! the caller must + own the mutex to array */ sync_cell_t* start, /* in: cell where recursive search started */ sync_cell_t* cell, /* in: cell to search */ ulint depth); /* in: recursion depth */ @@ -190,7 +194,7 @@ sync_array_create( sync_cell_t* cell_array; sync_cell_t* cell; ulint i; - + ut_a(n_cells > 0); /* Allocate memory for the data structures */ @@ -205,7 +209,7 @@ sync_array_create( arr->sg_count = 0; arr->res_count = 0; - /* Then create the mutex to protect the wait array complex */ + /* Then create the mutex to protect the wait array complex */ if (protection == SYNC_ARRAY_OS_MUTEX) { arr->os_mutex = os_mutex_create(NULL); } else if (protection == SYNC_ARRAY_MUTEX) { @@ -215,12 +219,13 @@ sync_array_create( ut_error; } - for (i = 0; i < n_cells; i++) { - cell = sync_array_get_nth_cell(arr, i); - cell->wait_object = NULL; + for (i = 0; i < n_cells; i++) { + cell = sync_array_get_nth_cell(arr, i); + cell->state = SC_FREE; + cell->wait_object = NULL; - /* Create an operating system event semaphore with no name */ - cell->event = os_event_create(NULL); + /* Create an operating system event semaphore with no name */ + cell->event = os_event_create(NULL); cell->event_set = FALSE; /* it is created in reset state */ } @@ -235,22 +240,22 @@ sync_array_free( /*============*/ sync_array_t* arr) /* in, own: sync wait array */ { - ulint i; - sync_cell_t* cell; + ulint i; + sync_cell_t* cell; ulint protection; - ut_a(arr->n_reserved == 0); - + ut_a(arr->n_reserved == 0); + sync_array_validate(arr); - - for (i = 0; i < arr->n_cells; i++) { - cell = sync_array_get_nth_cell(arr, i); + + for (i = 0; i < arr->n_cells; i++) { + cell = sync_array_get_nth_cell(arr, i); os_event_free(cell->event); - } + } protection = arr->protection; - /* Release the mutex protecting the wait array complex */ + /* Release the mutex protecting the wait array complex */ if (protection == SYNC_ARRAY_OS_MUTEX) { os_mutex_free(arr->os_mutex); @@ -261,7 +266,7 @@ sync_array_free( } ut_free(arr->array); - ut_free(arr); + ut_free(arr); } /************************************************************************ @@ -273,48 +278,24 @@ sync_array_validate( /*================*/ sync_array_t* arr) /* in: sync wait array */ { - ulint i; - sync_cell_t* cell; - ulint count = 0; - - sync_array_enter(arr); - - for (i = 0; i < arr->n_cells; i++) { - cell = sync_array_get_nth_cell(arr, i); + ulint i; + sync_cell_t* cell; + ulint count = 0; - if (cell->wait_object != NULL) { - count++; - } - } + sync_array_enter(arr); - ut_a(count == arr->n_reserved); + for (i = 0; i < arr->n_cells; i++) { + cell = sync_array_get_nth_cell(arr, i); - sync_array_exit(arr); -} + if (cell->state == SC_RESERVED) { + count++; + } + } -/*********************************************************************** -Puts the cell event in set state. */ -static -void -sync_cell_event_set( -/*================*/ - sync_cell_t* cell) /* in: array cell */ -{ - os_event_set(cell->event); - cell->event_set = TRUE; -} + ut_a(count == arr->n_reserved); -/*********************************************************************** -Puts the cell event in reset state. */ -static -void -sync_cell_event_reset( -/*==================*/ - sync_cell_t* cell) /* in: array cell */ -{ - os_event_reset(cell->event); - cell->event_set = FALSE; -} + sync_array_exit(arr); +} /********************************************************************** Reserves a wait array cell for waiting for an object. @@ -323,34 +304,39 @@ The event of the cell is reset to nonsignalled state. */ void sync_array_reserve_cell( /*====================*/ - sync_array_t* arr, /* in: wait array */ - void* object, /* in: pointer to the object to wait for */ - ulint type, /* in: lock request type */ + sync_array_t* arr, /* in: wait array */ + void* object, /* in: pointer to the object to wait for */ + ulint type, /* in: lock request type */ const char* file, /* in: file where requested */ - ulint line, /* in: line where requested */ - ulint* index) /* out: index of the reserved cell */ + ulint line, /* in: line where requested */ + ulint* index) /* out: index of the reserved cell */ { - sync_cell_t* cell; - ulint i; - - ut_a(object); - ut_a(index); + sync_cell_t* cell; + ulint i; - sync_array_enter(arr); + ut_a(object); + ut_a(index); - arr->res_count++; + sync_array_enter(arr); + + arr->res_count++; /* Reserve a new cell. */ - for (i = 0; i < arr->n_cells; i++) { - cell = sync_array_get_nth_cell(arr, i); + for (i = 0; i < arr->n_cells; i++) { + cell = sync_array_get_nth_cell(arr, i); - if (cell->wait_object == NULL) { + if (cell->state == SC_FREE) { - /* Make sure the event is reset */ - if (cell->event_set) { - sync_cell_event_reset(cell); - } + /* We do not check cell->event_set because it is + set outside the protection of the sync array mutex + and we had a bug regarding it, and since resetting + an event when it is not needed does no harm it is + safer always to do it. */ + cell->event_set = FALSE; + os_event_reset(cell->event); + + cell->state = SC_RESERVED; cell->reservation_time = time(NULL); cell->thread = os_thread_get_curr_id(); @@ -361,29 +347,91 @@ sync_array_reserve_cell( } else { cell->old_wait_rw_lock = object; } - + cell->request_type = type; cell->waiting = FALSE; - + cell->file = file; cell->line = line; - + arr->n_reserved++; *index = i; sync_array_exit(arr); - - return; - } - } - ut_error; /* No free cell found */ + return; + } + } + + ut_error; /* No free cell found */ return; } /********************************************************************** +Frees the cell. Note that we don't have any mutex reserved when calling +this. */ +static +void +sync_array_free_cell( +/*=================*/ + sync_array_t* arr, /* in: wait array */ + ulint index) /* in: index of the cell in array */ +{ + sync_cell_t* cell; + + cell = sync_array_get_nth_cell(arr, index); + + ut_a(cell->state == SC_WAKING_UP); + ut_a(cell->wait_object != NULL); + + cell->state = SC_FREE; +} + +/********************************************************************** +Frees the cell safely by reserving the sync array mutex and decrementing +n_reserved if necessary. Should only be called from mutex_spin_wait. */ + +void +sync_array_free_cell_protected( +/*===========================*/ + sync_array_t* arr, /* in: wait array */ + ulint index) /* in: index of the cell in array */ +{ + sync_cell_t* cell; + + sync_array_enter(arr); + + cell = sync_array_get_nth_cell(arr, index); + + ut_a(cell->state != SC_FREE); + ut_a(cell->wait_object != NULL); + + /* We only need to decrement n_reserved if it has not already been + done by sync_array_signal_object. */ + if (cell->state == SC_RESERVED) { + ut_a(arr->n_reserved > 0); + arr->n_reserved--; + } else if (cell->state == SC_WAKING_UP) { + /* This is tricky; if we don't wait for the event to be + signaled, signal_object can set the state of a cell to + SC_WAKING_UP, mutex_spin_wait can call this and set the + state to SC_FREE, and then signal_object gets around to + calling os_set_event for the cell but since it's already + been freed things break horribly. */ + + sync_array_exit(arr); + os_event_wait(cell->event); + sync_array_enter(arr); + } + + cell->state = SC_FREE; + + sync_array_exit(arr); +} + +/********************************************************************** This function should be called when a thread starts to wait on a wait array cell. In the debug version this function checks if the wait for a semaphore will result in a deadlock, in which @@ -392,51 +440,51 @@ case prints info and asserts. */ void sync_array_wait_event( /*==================*/ - sync_array_t* arr, /* in: wait array */ - ulint index) /* in: index of the reserved cell */ + sync_array_t* arr, /* in: wait array */ + ulint index) /* in: index of the reserved cell */ { - sync_cell_t* cell; - os_event_t event; - - ut_a(arr); + sync_cell_t* cell; + os_event_t event; - sync_array_enter(arr); + ut_a(arr); - cell = sync_array_get_nth_cell(arr, index); + cell = sync_array_get_nth_cell(arr, index); + ut_a((cell->state == SC_RESERVED) || (cell->state == SC_WAKING_UP)); ut_a(cell->wait_object); ut_a(!cell->waiting); ut_ad(os_thread_get_curr_id() == cell->thread); - event = cell->event; - cell->waiting = TRUE; + event = cell->event; + cell->waiting = TRUE; #ifdef UNIV_SYNC_DEBUG - + /* We use simple enter to the mutex below, because if we cannot acquire it at once, mutex_enter would call recursively sync_array routines, leading to trouble. rw_lock_debug_mutex freezes the debug lists. */ + sync_array_enter(arr); rw_lock_debug_mutex_enter(); if (TRUE == sync_array_detect_deadlock(arr, cell, cell, 0)) { fputs("########################################\n", stderr); ut_error; - } + } rw_lock_debug_mutex_exit(); + sync_array_exit(arr); #endif - sync_array_exit(arr); + os_event_wait(event); - os_event_wait(event); - - sync_array_free_cell(arr, index); + sync_array_free_cell(arr, index); } /********************************************************************** -Reports info of a wait array cell. */ +Reports info of a wait array cell. Note: sync_array_print_long_waits() +calls this without mutex protection. */ static void sync_array_cell_print( @@ -452,11 +500,20 @@ sync_array_cell_print( fprintf(file, "--Thread %lu has waited at %s line %lu for %.2f seconds the semaphore:\n", - (ulong) os_thread_pf(cell->thread), cell->file, - (ulong) cell->line, - difftime(time(NULL), cell->reservation_time)); + (ulong) os_thread_pf(cell->thread), cell->file, + (ulong) cell->line, + difftime(time(NULL), cell->reservation_time)); + fprintf(file, "Wait array cell state %lu\n", (ulong)cell->state); + + /* If the memory area pointed to by old_wait_mutex / + old_wait_rw_lock has been freed, this can crash. */ + + if (cell->state != SC_RESERVED) { + /* If cell has this state, then even if we are holding the sync + array mutex, the wait object may get freed meanwhile. Do not + print the wait object then. */ - if (type == SYNC_MUTEX) { + } else if (type == SYNC_MUTEX) { /* We use old_wait_mutex in case the cell has already been freed meanwhile */ mutex = cell->old_wait_mutex; @@ -492,7 +549,7 @@ sync_array_cell_print( ? " exclusive\n" : " wait exclusive\n"); } - + fprintf(file, "number of readers %lu, waiters flag %lu\n" "Last time read locked in file %s line %lu\n" @@ -507,11 +564,7 @@ sync_array_cell_print( ut_error; } - if (!cell->waiting) { - fputs("wait has ended\n", file); - } - - if (cell->event_set) { + if (cell->event_set) { fputs("wait is ending\n", file); } } @@ -525,22 +578,22 @@ sync_array_find_thread( /*===================*/ /* out: pointer to cell or NULL if not found */ - sync_array_t* arr, /* in: wait array */ + sync_array_t* arr, /* in: wait array */ os_thread_id_t thread) /* in: thread id */ { - ulint i; - sync_cell_t* cell; + ulint i; + sync_cell_t* cell; - for (i = 0; i < arr->n_cells; i++) { + for (i = 0; i < arr->n_cells; i++) { - cell = sync_array_get_nth_cell(arr, i); + cell = sync_array_get_nth_cell(arr, i); - if (cell->wait_object != NULL - && os_thread_eq(cell->thread, thread)) { + if ((cell->state == SC_RESERVED) + && os_thread_eq(cell->thread, thread)) { - return(cell); /* Found */ - } - } + return(cell); /* Found */ + } + } return(NULL); /* Not found */ } @@ -552,8 +605,8 @@ ibool sync_array_deadlock_step( /*=====================*/ /* out: TRUE if deadlock detected */ - sync_array_t* arr, /* in: wait array; NOTE! the caller must - own the mutex to array */ + sync_array_t* arr, /* in: wait array; NOTE! the caller must + own the mutex to array */ sync_cell_t* start, /* in: cell where recursive search started */ os_thread_id_t thread, /* in: thread to look at */ @@ -572,7 +625,7 @@ sync_array_deadlock_step( return(FALSE); } - + new = sync_array_find_thread(arr, thread); if (new == start) { @@ -604,8 +657,8 @@ ibool sync_array_detect_deadlock( /*=======================*/ /* out: TRUE if deadlock detected */ - sync_array_t* arr, /* in: wait array; NOTE! the caller must - own the mutex to array */ + sync_array_t* arr, /* in: wait array; NOTE! the caller must + own the mutex to array */ sync_cell_t* start, /* in: cell where recursive search started */ sync_cell_t* cell, /* in: cell to search */ ulint depth) /* in: recursion depth */ @@ -615,19 +668,20 @@ sync_array_detect_deadlock( os_thread_id_t thread; ibool ret; rw_lock_debug_t*debug; - - ut_a(arr && start && cell); + + ut_a(arr && start && cell); + ut_ad(cell->state == SC_RESERVED); ut_ad(cell->wait_object); ut_ad(os_thread_get_curr_id() == start->thread); ut_ad(depth < 100); - + depth++; - + if (cell->event_set || !cell->waiting) { return(FALSE); /* No deadlock here */ } - + if (cell->request_type == SYNC_MUTEX) { mutex = cell->wait_object; @@ -649,7 +703,8 @@ sync_array_detect_deadlock( if (ret) { fprintf(stderr, "Mutex %p owned by thread %lu file %s line %lu\n", - mutex, (ulong) os_thread_pf(mutex->thread_id), + mutex, + (ulong) os_thread_pf(mutex->thread_id), mutex->file_name, (ulong) mutex->line); sync_array_cell_print(stderr, cell); @@ -661,76 +716,76 @@ sync_array_detect_deadlock( } else if (cell->request_type == RW_LOCK_EX) { - lock = cell->wait_object; + lock = cell->wait_object; - debug = UT_LIST_GET_FIRST(lock->debug_list); + debug = UT_LIST_GET_FIRST(lock->debug_list); - while (debug != NULL) { + while (debug != NULL) { - thread = debug->thread_id; + thread = debug->thread_id; - if (((debug->lock_type == RW_LOCK_EX) - && !os_thread_eq(thread, cell->thread)) - || ((debug->lock_type == RW_LOCK_WAIT_EX) - && !os_thread_eq(thread, cell->thread)) - || (debug->lock_type == RW_LOCK_SHARED)) { + if (((debug->lock_type == RW_LOCK_EX) + && !os_thread_eq(thread, cell->thread)) + || ((debug->lock_type == RW_LOCK_WAIT_EX) + && !os_thread_eq(thread, cell->thread)) + || (debug->lock_type == RW_LOCK_SHARED)) { - /* The (wait) x-lock request can block infinitely - only if someone (can be also cell thread) is holding - s-lock, or someone (cannot be cell thread) (wait) - x-lock, and he is blocked by start thread */ + /* The (wait) x-lock request can block + infinitely only if someone (can be also cell + thread) is holding s-lock, or someone + (cannot be cell thread) (wait) x-lock, and + he is blocked by start thread */ - ret = sync_array_deadlock_step(arr, start, thread, - debug->pass, - depth); - if (ret) { - print: - fprintf(stderr, "rw-lock %p ", lock); - sync_array_cell_print(stderr, cell); - rw_lock_debug_print(debug); - return(TRUE); + ret = sync_array_deadlock_step(arr, start, + thread, debug->pass, depth); + if (ret) { + print: + fprintf(stderr, "rw-lock %p ", lock); + sync_array_cell_print(stderr, cell); + rw_lock_debug_print(debug); + return(TRUE); + } } - } - debug = UT_LIST_GET_NEXT(list, debug); - } + debug = UT_LIST_GET_NEXT(list, debug); + } - return(FALSE); + return(FALSE); } else if (cell->request_type == RW_LOCK_SHARED) { - lock = cell->wait_object; - debug = UT_LIST_GET_FIRST(lock->debug_list); + lock = cell->wait_object; + debug = UT_LIST_GET_FIRST(lock->debug_list); - while (debug != NULL) { + while (debug != NULL) { - thread = debug->thread_id; + thread = debug->thread_id; - if ((debug->lock_type == RW_LOCK_EX) - || (debug->lock_type == RW_LOCK_WAIT_EX)) { + if ((debug->lock_type == RW_LOCK_EX) + || (debug->lock_type == RW_LOCK_WAIT_EX)) { - /* The s-lock request can block infinitely only if - someone (can also be cell thread) is holding (wait) - x-lock, and he is blocked by start thread */ + /* The s-lock request can block infinitely + only if someone (can also be cell thread) is + holding (wait) x-lock, and he is blocked by + start thread */ - ret = sync_array_deadlock_step(arr, start, thread, - debug->pass, - depth); - if (ret) { - goto print; + ret = sync_array_deadlock_step(arr, start, + thread, debug->pass, depth); + if (ret) { + goto print; + } } - } - debug = UT_LIST_GET_NEXT(list, debug); - } + debug = UT_LIST_GET_NEXT(list, debug); + } - return(FALSE); + return(FALSE); } else { ut_error; } - return(TRUE); /* Execution never reaches this line: for compiler + return(TRUE); /* Execution never reaches this line: for compiler fooling only */ } #endif /* UNIV_SYNC_DEBUG */ @@ -745,7 +800,7 @@ sync_arr_cell_can_wake_up( { mutex_t* mutex; rw_lock_t* lock; - + if (cell->request_type == SYNC_MUTEX) { mutex = cell->wait_object; @@ -757,26 +812,26 @@ sync_arr_cell_can_wake_up( } else if (cell->request_type == RW_LOCK_EX) { - lock = cell->wait_object; + lock = cell->wait_object; - if (rw_lock_get_reader_count(lock) == 0 - && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) { + if (rw_lock_get_reader_count(lock) == 0 + && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) { return(TRUE); } - if (rw_lock_get_reader_count(lock) == 0 - && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX - && os_thread_eq(lock->writer_thread, cell->thread)) { + if (rw_lock_get_reader_count(lock) == 0 + && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX + && os_thread_eq(lock->writer_thread, cell->thread)) { return(TRUE); } } else if (cell->request_type == RW_LOCK_SHARED) { - lock = cell->wait_object; + lock = cell->wait_object; if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) { - + return(TRUE); } } @@ -784,32 +839,6 @@ sync_arr_cell_can_wake_up( return(FALSE); } -/********************************************************************** -Frees the cell. NOTE! sync_array_wait_event frees the cell -automatically! */ - -void -sync_array_free_cell( -/*=================*/ - sync_array_t* arr, /* in: wait array */ - ulint index) /* in: index of the cell in array */ -{ - sync_cell_t* cell; - - sync_array_enter(arr); - - cell = sync_array_get_nth_cell(arr, index); - - ut_a(cell->wait_object != NULL); - - cell->wait_object = NULL; - - ut_a(arr->n_reserved > 0); - arr->n_reserved--; - - sync_array_exit(arr); -} - /************************************************************************** Looks for the cells in the wait array which refer to the wait object specified, and sets their corresponding events to the signaled state. In this @@ -822,73 +851,142 @@ sync_array_signal_object( sync_array_t* arr, /* in: wait array */ void* object) /* in: wait object */ { - sync_cell_t* cell; - ulint count; - ulint i; + sync_cell_t* cell; + ulint count; + ulint i; + ulint res_count; + + /* We store the addresses of cells we need to signal and signal + them only after we have released the sync array's mutex (for + performance reasons). cell_count is the number of such cells, and + cell_ptr points to the first one. If there are less than + UT_ARR_SIZE(cells) of them, cell_ptr == &cells[0], otherwise + cell_ptr points to malloc'd memory that we must free. */ + + sync_cell_t* cells[100]; + sync_cell_t** cell_ptr = &cells[0]; + ulint cell_count = 0; + ulint cell_max_count = UT_ARR_SIZE(cells); - sync_array_enter(arr); + ut_a(100 == cell_max_count); + + sync_array_enter(arr); arr->sg_count++; i = 0; count = 0; - while (count < arr->n_reserved) { + /* We need to store this to a local variable because it is modified + inside the loop */ + res_count = arr->n_reserved; + + while (count < res_count) { + + cell = sync_array_get_nth_cell(arr, i); + + if (cell->state == SC_RESERVED) { + + count++; + if (cell->wait_object == object) { + cell->state = SC_WAKING_UP; - cell = sync_array_get_nth_cell(arr, i); + ut_a(arr->n_reserved > 0); + arr->n_reserved--; - if (cell->wait_object != NULL) { + if (cell_count == cell_max_count) { + sync_cell_t** old_cell_ptr = cell_ptr; + size_t old_size, new_size; - count++; - if (cell->wait_object == object) { + old_size = cell_max_count * + sizeof(sync_cell_t*); + cell_max_count *= 2; + new_size = cell_max_count * + sizeof(sync_cell_t*); - sync_cell_event_set(cell); - } - } + cell_ptr = malloc(new_size); + ut_a(cell_ptr); + memcpy(cell_ptr, old_cell_ptr, + old_size); - i++; - } + if (old_cell_ptr != &cells[0]) { + free(old_cell_ptr); + } + } + + cell_ptr[cell_count] = cell; + cell_count++; + } + } + + i++; + } - sync_array_exit(arr); + sync_array_exit(arr); + + for (i = 0; i < cell_count; i++) { + cell = cell_ptr[i]; + + cell->event_set = TRUE; + os_event_set(cell->event); + } + + if (cell_ptr != &cells[0]) { + free(cell_ptr); + } } /************************************************************************** If the wakeup algorithm does not work perfectly at semaphore relases, this function will do the waking (see the comment in mutex_exit). This -function should be called about every 1 second in the server. */ +function should be called about every 1 second in the server. + +Note that there's a race condition between this thread and mutex_exit +changing the lock_word and calling signal_object, so sometimes this finds +threads to wake up even when nothing has gone wrong. */ void sync_arr_wake_threads_if_sema_free(void) /*====================================*/ { - sync_array_t* arr = sync_primary_wait_array; - sync_cell_t* cell; - ulint count; - ulint i; + sync_array_t* arr = sync_primary_wait_array; + sync_cell_t* cell; + ulint count; + ulint i; + ulint res_count; - sync_array_enter(arr); + sync_array_enter(arr); i = 0; count = 0; - while (count < arr->n_reserved) { + /* We need to store this to a local variable because it is modified + inside the loop */ - cell = sync_array_get_nth_cell(arr, i); + res_count = arr->n_reserved; - if (cell->wait_object != NULL) { + while (count < res_count) { - count++; + cell = sync_array_get_nth_cell(arr, i); - if (sync_arr_cell_can_wake_up(cell)) { + if (cell->state == SC_RESERVED) { - sync_cell_event_set(cell); - } - } + count++; - i++; - } + if (sync_arr_cell_can_wake_up(cell)) { + cell->state = SC_WAKING_UP; + cell->event_set = TRUE; + os_event_set(cell->event); - sync_array_exit(arr); + ut_a(arr->n_reserved > 0); + arr->n_reserved--; + } + } + + i++; + } + + sync_array_exit(arr); } /************************************************************************** @@ -900,36 +998,36 @@ sync_array_print_long_waits(void) /* out: TRUE if fatal semaphore wait threshold was exceeded */ { - sync_cell_t* cell; - ibool old_val; + sync_cell_t* cell; + ibool old_val; ibool noticed = FALSE; - ulint i; + ulint i; ulint fatal_timeout = srv_fatal_semaphore_wait_threshold; ibool fatal = FALSE; - for (i = 0; i < sync_primary_wait_array->n_cells; i++) { + for (i = 0; i < sync_primary_wait_array->n_cells; i++) { - cell = sync_array_get_nth_cell(sync_primary_wait_array, i); + cell = sync_array_get_nth_cell(sync_primary_wait_array, i); - if (cell->wait_object != NULL - && difftime(time(NULL), cell->reservation_time) > 240) { + if ((cell->state != SC_FREE) + && difftime(time(NULL), cell->reservation_time) > 240) { fputs("InnoDB: Warning: a long semaphore wait:\n", stderr); sync_array_cell_print(stderr, cell); noticed = TRUE; - } + } - if (cell->wait_object != NULL - && difftime(time(NULL), cell->reservation_time) - > fatal_timeout) { + if ((cell->state != SC_FREE) + && difftime(time(NULL), cell->reservation_time) + > fatal_timeout) { fatal = TRUE; - } - } + } + } if (noticed) { fprintf(stderr, "InnoDB: ###### Starts InnoDB Monitor for 30 secs to print diagnostic info:\n"); - old_val = srv_print_innodb_monitor; + old_val = srv_print_innodb_monitor; /* If some crucial semaphore is reserved, then also the InnoDB Monitor can hang, and we do not get diagnostics. Since in @@ -941,12 +1039,12 @@ sync_array_print_long_waits(void) "InnoDB: Pending preads %lu, pwrites %lu\n", (ulong)os_file_n_pending_preads, (ulong)os_file_n_pending_pwrites); - srv_print_innodb_monitor = TRUE; + srv_print_innodb_monitor = TRUE; os_event_set(srv_lock_timeout_thread_event); - os_thread_sleep(30000000); + os_thread_sleep(30000000); - srv_print_innodb_monitor = old_val; + srv_print_innodb_monitor = old_val; fprintf(stderr, "InnoDB: ###### Diagnostic info printed to the standard error stream\n"); } @@ -964,27 +1062,21 @@ sync_array_output_info( sync_array_t* arr) /* in: wait array; NOTE! caller must own the mutex */ { - sync_cell_t* cell; - ulint count; - ulint i; + sync_cell_t* cell; + ulint i; fprintf(file, - "OS WAIT ARRAY INFO: reservation count %ld, signal count %ld\n", - (long) arr->res_count, (long) arr->sg_count); - i = 0; - count = 0; - - while (count < arr->n_reserved) { + "OS WAIT ARRAY INFO: reservation count %ld, signal count %ld\n", + (long) arr->res_count, + (long) arr->sg_count); + for (i = 0; i < arr->n_cells; i++) { - cell = sync_array_get_nth_cell(arr, i); + cell = sync_array_get_nth_cell(arr, i); - if (cell->wait_object != NULL) { - count++; + if (cell->state != SC_FREE) { sync_array_cell_print(file, cell); - } - - i++; - } + } + } } /************************************************************************** @@ -996,10 +1088,10 @@ sync_array_print_info( FILE* file, /* in: file where to print */ sync_array_t* arr) /* in: wait array */ { - sync_array_enter(arr); + sync_array_enter(arr); sync_array_output_info(file, arr); - - sync_array_exit(arr); + + sync_array_exit(arr); } |