From 04b6b0732ddaac933c39353fcb4e1a9f42c2c59f Mon Sep 17 00:00:00 2001 From: Vladislav Vaintroub Date: Mon, 19 Sep 2016 12:31:20 +0000 Subject: Windows : completion port based asynchronous IO. --- storage/innobase/os/os0file.cc | 399 ++++++++++++++++++----------------------- 1 file changed, 173 insertions(+), 226 deletions(-) diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index dfe5021da8c..3cd92282aa0 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -162,12 +162,10 @@ static ulint os_innodb_umask = 0; #ifndef ECANCELED #define ECANCELED 125 #endif - -/* On Windows when using native AIO the number of AIO requests -that a thread can handle at a given time is limited to 32 -i.e.: SRV_N_PENDING_IOS_PER_THREAD */ -#define SRV_N_PENDING_IOS_PER_THREAD OS_AIO_N_PENDING_IOS_PER_THREAD - +static HANDLE completion_port; +static HANDLE read_completion_port; +static DWORD fls_sync_io = FLS_OUT_OF_INDEXES; +#define IOCP_SHUTDOWN_KEY (ULONG_PTR)-1 #endif /* _WIN32 */ #ifndef UNIV_HOTBACKUP @@ -246,8 +244,19 @@ mysql_pfs_key_t innodb_log_file_key; mysql_pfs_key_t innodb_temp_file_key; #endif /* UNIV_PFS_IO */ +class AIO; + /** The asynchronous I/O context */ struct Slot { + +#ifdef WIN_ASYNC_IO + /** Windows control block for the aio request + must be at the very start of Slot, so we can + cast Slot* to OVERLAPPED* + */ + OVERLAPPED control; +#endif + /** index of the slot in the aio array */ uint16_t pos; @@ -297,11 +306,6 @@ struct Slot { dberr_t err; #ifdef WIN_ASYNC_IO - /** handle object we need in the OVERLAPPED struct */ - HANDLE handle; - - /** Windows control block for the aio request */ - OVERLAPPED control; /** bytes written/read */ DWORD n_bytes; @@ -309,6 +313,8 @@ struct Slot { /** length of the block to read or write */ DWORD len; + /** aio array containing this slot */ + AIO *array; #elif defined(LINUX_NATIVE_AIO) /** Linux control block for aio */ struct iocb control; @@ -510,31 +516,11 @@ public: #endif /* LINUX_NATIVE_AIO */ #ifdef WIN_ASYNC_IO - /** Wakes up all async i/o threads in the array in Windows async I/O at - shutdown. */ - void signal() - { - for (ulint i = 0; i < m_slots.size(); ++i) { - SetEvent(m_slots[i].handle); - } - } - + /** Wake up all AIO threads in Windows native aio */ - static void wake_at_shutdown() - { - s_reads->signal(); - - if (s_writes != NULL) { - s_writes->signal(); - } - - if (s_ibuf != NULL) { - s_ibuf->signal(); - } - - if (s_log != NULL) { - s_log->signal(); - } + static void wake_at_shutdown() { + PostQueuedCompletionStatus(completion_port, 0, IOCP_SHUTDOWN_KEY, NULL); + PostQueuedCompletionStatus(read_completion_port, 0, IOCP_SHUTDOWN_KEY, NULL); } #endif /* WIN_ASYNC_IO */ @@ -545,33 +531,7 @@ public: threads are not left sleeping! */ static void simulated_put_read_threads_to_sleep(); - /** The non asynchronous IO array. - @return the synchronous AIO array instance. */ - static AIO* sync_array() - MY_ATTRIBUTE((warn_unused_result)) - { - return(s_sync); - } - - /** - Get the AIO handles for a segment. - @param[in] segment The local segment. - @return the handles for the segment. */ - HANDLE* handles(ulint segment) - MY_ATTRIBUTE((warn_unused_result)) - { - ut_ad(segment < m_handles->size() / slots_per_segment()); - - return(&(*m_handles)[segment * slots_per_segment()]); - } - - /** @return true if no slots are reserved */ - bool is_empty() const - MY_ATTRIBUTE((warn_unused_result)) - { - ut_ad(is_mutex_owned()); - return(m_n_reserved == 0); - } + #endif /* _WIN32 */ /** Create an instance using new(std::nothrow) @@ -728,14 +688,6 @@ private: the ibuf segment */ ulint m_n_reserved; -#ifdef _WIN32 - typedef std::vector > Handles; - - /** Pointer to an array of OS native event handles where - we copied the handles from slots, in the same order. This - can be used in WaitForMultipleObjects; used only in Windows */ - Handles* m_handles; -#endif /* _WIN32 */ #if defined(LINUX_NATIVE_AIO) typedef std::vector IOEvents; @@ -1645,11 +1597,7 @@ AIO::release(Slot* slot) os_event_set(m_is_empty); } -#ifdef WIN_ASYNC_IO - - ResetEvent(slot->handle); - -#elif defined(LINUX_NATIVE_AIO) +#if defined(LINUX_NATIVE_AIO) if (srv_use_native_aio) { memset(&slot->control, 0x0, sizeof(slot->control)); @@ -4144,6 +4092,57 @@ os_aio_simulated_put_read_threads_to_sleep() #include +/* +Windows : Handling synchronous IO on files opened asynchronously. + +If file is opened for asynchronous IO (FILE_FLAG_OVERLAPPED) and also bound to +a completion port, then every IO on this file would normally be enqueued to the +completion port. Sometimes however we would like to do a synchronous IO. This is +possible if we initialitze have overlapped.hEvent with a valid event and set its +lowest order bit to 1 (see MSDN ReadFile and WriteFile description for more info) + +We'll create this special event once for each thread and store in thread local +storage. +*/ + + +static void __stdcall win_free_syncio_event(void *data) { + if (data) { + CloseHandle((HANDLE)data); + } +} + + +/* +Initialize tls index.for event handle used for synchronized IO on files that +might be opened with FILE_FLAG_OVERLAPPED. +*/ +static void win_init_syncio_event() { + fls_sync_io = FlsAlloc(win_free_syncio_event); + ut_a(fls_sync_io != FLS_OUT_OF_INDEXES); +} + + +/* +Retrieve per-thread event for doing synchronous io on asyncronously opened files +*/ +static HANDLE win_get_syncio_event() +{ + HANDLE h; + + h = (HANDLE)FlsGetValue(fls_sync_io); + if (h) { + return h; + } + h = CreateEventA(NULL, FALSE, FALSE, NULL); + ut_a(h); + /* Set low-order bit to keeps I/O completion from being queued */ + h = (HANDLE)((uintptr_t)h | 1); + FlsSetValue(fls_sync_io, h); + return h; +} + + /** Do the read/write @param[in] request The IO context and type @return the number of bytes read/written or negative value on error */ @@ -4154,6 +4153,7 @@ SyncFileIO::execute(const IORequest& request) memset(&seek, 0x0, sizeof(seek)); + seek.hEvent = win_get_syncio_event(); seek.Offset = (DWORD) m_offset & 0xFFFFFFFF; seek.OffsetHigh = (DWORD) (m_offset >> 32); @@ -4169,6 +4169,10 @@ SyncFileIO::execute(const IORequest& request) ret = WriteFile(m_fh, m_buf, static_cast(m_n), &n_bytes, &seek); } + if (!ret && (GetLastError() == ERROR_IO_PENDING)) { + /* Wait for async io to complete */ + ret = GetOverlappedResult(m_fh, &seek, &n_bytes, TRUE); + } return(ret ? static_cast(n_bytes) : -1); } @@ -4180,7 +4184,7 @@ ssize_t SyncFileIO::execute(Slot* slot) { BOOL ret; - + slot->control.hEvent = win_get_syncio_event(); if (slot->type.is_read()) { ret = ReadFile( @@ -4195,10 +4199,34 @@ SyncFileIO::execute(Slot* slot) &slot->n_bytes, &slot->control); } - + if (!ret && (GetLastError() == ERROR_IO_PENDING)) { + /* Wait for async io to complete */ + ret = GetOverlappedResult(slot->file, &slot->control, &slot->n_bytes, TRUE); + } return(ret ? slot->n_bytes : -1); } +/* Startup/shutdown */ + +struct WinIoInit +{ + WinIoInit() { + completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + read_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); ut_a(completion_port && read_completion_port); + fls_sync_io = FlsAlloc(win_free_syncio_event); + ut_a(fls_sync_io != FLS_OUT_OF_INDEXES); + } + + ~WinIoInit() { + CloseHandle(completion_port); + CloseHandle(read_completion_port); + FlsFree(fls_sync_io); + } +}; + +/* Ensures proper initialization and shutdown */ +static WinIoInit win_io_init; + /** Check if the file system supports sparse files. @param[in] name File name @return true if the file system supports sparse files */ @@ -4922,6 +4950,10 @@ os_file_create_func( *success = true; + if (srv_use_native_aio && ((attributes & FILE_FLAG_OVERLAPPED) != 0)) { + /* Bind the file handle to completion port */ + ut_a(CreateIoCompletionPort(file, completion_port, 0, 0)); + } DWORD temp; /* This is a best effort use case, if it fails then @@ -6447,8 +6479,6 @@ AIO::AIO( # ifdef LINUX_NATIVE_AIO ,m_aio_ctx(), m_events(m_slots.size()) -# elif defined(_WIN32) - ,m_handles() # endif /* LINUX_NATIVE_AIO */ { ut_a(n > 0); @@ -6480,13 +6510,7 @@ AIO::init_slots() #ifdef WIN_ASYNC_IO - slot.handle = CreateEvent(NULL, TRUE, FALSE, NULL); - - OVERLAPPED* over = &slot.control; - - over->hEvent = slot.handle; - - (*m_handles)[i] = over->hEvent; + slot.array = this; #elif defined(LINUX_NATIVE_AIO) @@ -6555,11 +6579,6 @@ AIO::init() { ut_a(!m_slots.empty()); -#ifdef _WIN32 - ut_a(m_handles == NULL); - - m_handles = UT_NEW_NOKEY(Handles(m_slots.size())); -#endif /* _WIN32 */ if (srv_use_native_aio) { #ifdef LINUX_NATIVE_AIO @@ -6613,16 +6632,6 @@ AIO::create( /** AIO destructor */ AIO::~AIO() { -#ifdef WIN_ASYNC_IO - for (ulint i = 0; i < m_slots.size(); ++i) { - CloseHandle(m_slots[i].handle); - } -#endif /* WIN_ASYNC_IO */ - -#ifdef _WIN32 - UT_DELETE(m_handles); -#endif /* _WIN32 */ - mutex_destroy(&m_mutex); os_event_destroy(m_not_full); @@ -6806,11 +6815,6 @@ os_aio_init( /* Maximum number of pending aio operations allowed per segment */ ulint limit = 8 * OS_AIO_N_PENDING_IOS_PER_THREAD; -#ifdef _WIN32 - if (srv_use_native_aio) { - limit = SRV_N_PENDING_IOS_PER_THREAD; - } -#endif /* _WIN32 */ ut_a(block_cache == NULL); @@ -7145,8 +7149,6 @@ AIO::reserve_slot( control = &slot->control; control->Offset = (DWORD) offset & 0xFFFFFFFF; control->OffsetHigh = (DWORD) (offset >> 32); - - ResetEvent(slot->handle); } #elif defined(LINUX_NATIVE_AIO) @@ -7331,6 +7333,10 @@ therefore no other thread is allowed to do the freeing! @param[out] m2 callback message @param[out] type OS_FILE_WRITE or ..._READ @return DB_SUCCESS or error code */ + +#define READ_SEGMENT(s) (s < srv_n_read_io_threads) +#define WRITE_SEGMENT(s) !READ_SEGMENT(s) + static dberr_t os_aio_windows_handler( @@ -7340,86 +7346,74 @@ os_aio_windows_handler( void** m2, IORequest* type) { - Slot* slot; + Slot* slot= 0; dberr_t err; - AIO* array; - ulint orig_seg = segment; - if (segment == ULINT_UNDEFINED) { - segment = 0; - array = AIO::sync_array(); - } else { - segment = AIO::get_array_and_local_segment(&array, segment); - } + BOOL ret; + ULONG_PTR key; + + ut_a(segment != ULINT_UNDEFINED); /* NOTE! We only access constant fields in os_aio_array. Therefore we do not have to acquire the protecting mutex yet */ ut_ad(os_aio_validate_skip()); - if (array == AIO::sync_array()) { + HANDLE port = READ_SEGMENT(segment) ? read_completion_port : completion_port; + for (;;) { + DWORD len; + ret = GetQueuedCompletionStatus(port, &len, &key, + (OVERLAPPED **)&slot, INFINITE); - WaitForSingleObject(array->at(pos)->handle, INFINITE); + /* If shutdown key was received, repost the shutdown message and exit */ + if (ret && key == IOCP_SHUTDOWN_KEY) { + PostQueuedCompletionStatus(port, 0, key, NULL); + *m1 = NULL; + *m2 = NULL; + return (DB_SUCCESS); + } - } else { - if (orig_seg != ULINT_UNDEFINED) { - srv_set_io_thread_op_info(orig_seg, "wait Windows aio"); - } - - DWORD handle_count = (DWORD)array->slots_per_segment(); - HANDLE *handle_array = array->handles(segment); - pos = WaitForMultipleObjects( - handle_count, - handle_array, - FALSE, INFINITE); - if (pos == WAIT_FAILED) { - DWORD last_error = GetLastError(); - ib::error() - << "WaitForMultipleObjects() failed with error " - << last_error; - ut_error; + ut_a(slot); + if (!ret) { + /* IO failed */ + break; } - } - array->acquire(); + slot->n_bytes= len; - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS - && array->is_empty() - && !buf_page_cleaner_is_active) { + if (WRITE_SEGMENT(segment) && slot->type.is_read()) { + /* + Redirect read completions to the dedicated completion port + and thread. We need to split read and write threads. If we do not + do that, and just allow all io threads process all IO, it is possible + to get stuck in a deadlock in buffer pool code, - *m1 = NULL; - *m2 = NULL; + Currently, the problem is solved this way - "write io" threads + always get all completion notifications, from both async reads and + writes. Write completion is handled in the same thread that gets it. + Read completion is forwarded via PostQueueCompletionStatus()) + to the second completion port dedicated solely to reads. One of the + "read io" threads waiting on this port will finally handle the IO. - array->release(); - - return(DB_SUCCESS); + Forwarding IO completion this way costs a context switch , and this + seems tolerable since asynchronous reads are by far less frequent. + */ + ut_a(PostQueuedCompletionStatus(read_completion_port, len, key, &slot->control)); + } + else { + break; + } } - ulint n = array->slots_per_segment(); - - ut_a(pos >= WAIT_OBJECT_0 && pos <= WAIT_OBJECT_0 + n); - - slot = array->at(pos + segment * n); - ut_a(slot->is_reserved); - if (orig_seg != ULINT_UNDEFINED) { - srv_set_io_thread_op_info( - orig_seg, "get windows aio return value"); - } - - BOOL ret; - - ret = GetOverlappedResult( - slot->file, &slot->control, &slot->n_bytes, TRUE); - *m1 = slot->m1; *m2 = slot->m2; *type = slot->type; - BOOL retry = FALSE; + bool retry = false; if (ret && slot->n_bytes == slot->len) { @@ -7434,11 +7428,9 @@ os_aio_windows_handler( err = DB_IO_ERROR; } - array->release(); if (retry) { - /* Retry failed read/write operation synchronously. - No need to hold array->m_mutex. */ + /* Retry failed read/write operation synchronously. */ #ifdef UNIV_PFS_IO /* This read/write does not go through os_file_read @@ -7460,22 +7452,6 @@ os_aio_windows_handler( register_pfs_file_io_end(locker, slot->len); #endif /* UNIV_PFS_IO */ - if (n_bytes < 0 && GetLastError() == ERROR_IO_PENDING) { - /* AIO was queued successfully! - We want a synchronous I/O operation on a - file where we also use async I/O: in Windows - we must use the same wait mechanism as for - async I/O */ - - BOOL ret; - - ret = GetOverlappedResult( - slot->file, &slot->control, &slot->n_bytes, - TRUE); - - n_bytes = ret ? slot->n_bytes : -1; - } - err = (n_bytes == slot->len) ? DB_SUCCESS : DB_IO_ERROR; } @@ -7483,8 +7459,15 @@ os_aio_windows_handler( err = AIOHandler::post_io_processing(slot); } - array->release_with_mutex(slot); + ut_a(slot->array); + slot->array->release_with_mutex(slot); + if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS + && !buf_page_cleaner_is_active + && os_aio_all_slots_free()) { + /* Last IO, wakeup other io threads */ + AIO::wake_at_shutdown(); + } return(err); } #endif /* WIN_ASYNC_IO */ @@ -7527,7 +7510,6 @@ os_aio_func( actual page size does not decrease. */ { #ifdef WIN_ASYNC_IO - void* buffer = NULL; BOOL ret = TRUE; #endif /* WIN_ASYNC_IO */ @@ -7543,24 +7525,7 @@ os_aio_func( DBUG_EXECUTE_IF("ib_os_aio_func_io_failure_28", mode = OS_AIO_SYNC; os_has_said_disk_full = FALSE;); - if (mode == OS_AIO_SYNC -#ifdef WIN_ASYNC_IO - && !srv_use_native_aio -#endif /* WIN_ASYNC_IO */ - ) { - /* This is actually an ordinary synchronous read or write: - no need to use an i/o-handler thread. NOTE that if we use - Windows async i/o, Windows does not allow us to use - ordinary synchronous os_file_read etc. on the same file, - therefore we have built a special mechanism for synchronous - wait in the Windows case. - Also note that the Performance Schema instrumentation has - been performed by current os_aio_func()'s wrapper function - pfs_os_aio_func(). So we would no longer need to call - Performance Schema instrumented os_file_read() and - os_file_write(). Instead, we should use os_file_read_func() - and os_file_write_func() */ - + if (mode == OS_AIO_SYNC) { if (type.is_read()) { return(os_file_read_func(type, file, buf, offset, n)); } @@ -7627,32 +7592,14 @@ try_again: } #ifdef WIN_ASYNC_IO - if (srv_use_native_aio) { - if ((ret && slot->len == slot->n_bytes) - || (!ret && GetLastError() == ERROR_IO_PENDING)) { - /* aio was queued successfully! */ - - if (mode == OS_AIO_SYNC) { - IORequest dummy_type; - void* dummy_mess2; - struct fil_node_t* dummy_mess1; - - /* We want a synchronous i/o operation on a - file where we also use async i/o: in Windows - we must use the same wait mechanism as for - async i/o */ - - return(os_aio_windows_handler( - ULINT_UNDEFINED, slot->pos, - &dummy_mess1, &dummy_mess2, - &dummy_type)); - } + if ((ret && slot->len == slot->n_bytes) + || (!ret && GetLastError() == ERROR_IO_PENDING)) { + /* aio completed or was queued successfully! */ + return(DB_SUCCESS); + } - return(DB_SUCCESS); - } + goto err_exit; - goto err_exit; - } #endif /* WIN_ASYNC_IO */ /* AIO request was queued successfully! */ -- cgit v1.2.1