diff options
Diffstat (limited to 'storage/xtradb/os')
-rw-r--r-- | storage/xtradb/os/os0file.c | 1458 | ||||
-rw-r--r-- | storage/xtradb/os/os0proc.c | 174 | ||||
-rw-r--r-- | storage/xtradb/os/os0sync.c | 602 | ||||
-rw-r--r-- | storage/xtradb/os/os0thread.c | 124 |
4 files changed, 1526 insertions, 832 deletions
diff --git a/storage/xtradb/os/os0file.c b/storage/xtradb/os/os0file.c index 48d796c38e1..835210140f8 100644 --- a/storage/xtradb/os/os0file.c +++ b/storage/xtradb/os/os0file.c @@ -33,6 +33,11 @@ Created 10/21/1995 Heikki Tuuri *******************************************************/ #include "os0file.h" + +#ifdef UNIV_NONINL +#include "os0file.ic" +#endif + #include "ut0mem.h" #include "srv0srv.h" #include "srv0start.h" @@ -53,6 +58,10 @@ Created 10/21/1995 Heikki Tuuri # endif /* __WIN__ */ #endif /* !UNIV_HOTBACKUP */ +#if defined(LINUX_NATIVE_AIO) +#include <libaio.h> +#endif + /* This specifies the file permissions InnoDB uses when it creates files in Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to my_umask */ @@ -74,9 +83,7 @@ UNIV_INTERN ibool os_do_not_call_flush_at_each_write = FALSE; /* We do not call os_file_flush in every os_file_write. */ #endif /* UNIV_DO_FLUSH */ -#ifdef UNIV_HOTBACKUP -# define os_aio_use_native_aio FALSE -#else /* UNIV_HOTBACKUP */ +#ifndef UNIV_HOTBACKUP /* We use these mutexes to protect lseek + file i/o operation, if the OS does not provide an atomic pread or pwrite, or similar */ #define OS_FILE_N_SEEK_MUTEXES 16 @@ -85,36 +92,69 @@ UNIV_INTERN os_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES]; /* In simulated aio, merge at most this many consecutive i/os */ #define OS_AIO_MERGE_N_CONSECUTIVE 64 -/** If this flag is TRUE, then we will use the native aio of the -OS (provided we compiled Innobase with it in), otherwise we will -use simulated aio we build below with threads */ - -UNIV_INTERN ibool os_aio_use_native_aio = FALSE; +/********************************************************************** + +InnoDB AIO Implementation: +========================= + +We support native AIO for windows and linux. For rest of the platforms +we simulate AIO by special io-threads servicing the IO-requests. + +Simulated AIO: +============== + +In platforms where we 'simulate' AIO following is a rough explanation +of the high level design. +There are four io-threads (for ibuf, log, read, write). +All synchronous IO requests are serviced by the calling thread using +os_file_write/os_file_read. The Asynchronous requests are queued up +in an array (there are four such arrays) by the calling thread. +Later these requests are picked up by the io-thread and are serviced +synchronously. + +Windows native AIO: +================== + +If srv_use_native_aio is not set then windows follow the same +code as simulated AIO. If the flag is set then native AIO interface +is used. On windows, one of the limitation is that if a file is opened +for AIO no synchronous IO can be done on it. Therefore we have an +extra fifth array to queue up synchronous IO requests. +There are innodb_file_io_threads helper threads. These threads work +on the four arrays mentioned above in Simulated AIO. No thread is +required for the sync array. +If a synchronous IO request is made, it is first queued in the sync +array. Then the calling thread itself waits on the request, thus +making the call synchronous. +If an AIO request is made the calling thread not only queues it in the +array but also submits the requests. The helper thread then collects +the completed IO request and calls completion routine on it. + +Linux native AIO: +================= + +If we have libaio installed on the system and innodb_use_native_aio +is set to TRUE we follow the code path of native AIO, otherwise we +do simulated AIO. +There are innodb_file_io_threads helper threads. These threads work +on the four arrays mentioned above in Simulated AIO. +If a synchronous IO request is made, it is handled by calling +os_file_write/os_file_read. +If an AIO request is made the calling thread not only queues it in the +array but also submits the requests. The helper thread then collects +the completed IO request and calls completion routine on it. + +**********************************************************************/ /** Flag: enable debug printout for asynchronous i/o */ UNIV_INTERN ibool os_aio_print_debug = FALSE; -/* State for the state of an IO request in simulated AIO. - Protocol for simulated aio: - client requests IO: find slot with reserved = FALSE. Add entry with - status = OS_AIO_NOT_ISSUED. - IO thread wakes: find adjacent slots with reserved = TRUE and status = - OS_AIO_NOT_ISSUED. Change status for slots to - OS_AIO_ISSUED. - IO operation completes: set status for slots to OS_AIO_DONE. set status - for the first slot to OS_AIO_CLAIMED and return - result for that slot. - When there are multiple read and write threads, they all compete to execute - the requests in the array (os_aio_array_t). This avoids the need to load - balance requests at the time the request is made at the cost of waking all - threads when a request is available. -*/ -typedef enum { - OS_AIO_NOT_ISSUED, /* Available to be processed by an IO thread. */ - OS_AIO_ISSUED, /* Being processed by an IO thread. */ - OS_AIO_DONE, /* Request processed. */ - OS_AIO_CLAIMED /* Result being returned to client. */ -} os_aio_status; +#ifdef UNIV_PFS_IO +/* Keys to register InnoDB I/O with performance schema */ +UNIV_INTERN mysql_pfs_key_t innodb_file_data_key; +UNIV_INTERN mysql_pfs_key_t innodb_file_log_key; +UNIV_INTERN mysql_pfs_key_t innodb_file_temp_key; +#endif /* UNIV_PFS_IO */ /** The asynchronous i/o array slot structure */ typedef struct os_aio_slot_struct os_aio_slot_t; @@ -125,8 +165,6 @@ struct os_aio_slot_struct{ ulint pos; /*!< index of the slot in the aio array */ ibool reserved; /*!< TRUE if this slot is reserved */ - os_aio_status status; /* Status for current request. Valid when reserved - is TRUE. Used only in simulated aio. */ time_t reservation_time;/*!< time when reserved */ ulint len; /*!< length of the block to read or write */ @@ -137,21 +175,26 @@ struct os_aio_slot_struct{ ulint offset_high; /*!< 32 high bits of file offset */ os_file_t file; /*!< file where to read or write */ const char* name; /*!< file name or path */ -// ibool io_already_done;/*!< used only in simulated aio: -// TRUE if the physical i/o already -// made and only the slot message -// needs to be passed to the caller -// of os_aio_simulated_handle */ + ibool io_already_done;/*!< used only in simulated aio: + TRUE if the physical i/o already + made and only the slot message + needs to be passed to the caller + of os_aio_simulated_handle */ + ulint space_id; fil_node_t* message1; /*!< message which is given by the */ void* message2; /*!< the requester of an aio operation and which can be used to identify which pending aio operation was completed */ #ifdef WIN_ASYNC_IO - os_event_t event; /*!< event object we need in the + HANDLE handle; /*!< handle object we need in the OVERLAPPED struct */ OVERLAPPED control; /*!< Windows control block for the aio request */ +#elif defined(LINUX_NATIVE_AIO) + struct iocb control; /* Linux control block for aio */ + int n_bytes; /* bytes written/read. */ + int ret; /* AIO return code */ #endif }; @@ -177,12 +220,16 @@ struct os_aio_array_struct{ array of pending aio requests. A thread can wait separately for any one of the segments. */ + ulint cur_seg;/*!< We reserve IO requests in round + robin fashion to different segments. + This points to the segment that is to + be used to service next IO request. */ ulint n_reserved; /*!< Number of reserved slots in the aio array outside the ibuf segment */ os_aio_slot_t* slots; /*!< Pointer to the slots in the array */ #ifdef __WIN__ - os_native_event_t* native_events; + HANDLE* handles; /*!< Pointer to an array of OS native event handles where we copied the handles from slots, in the same @@ -190,17 +237,33 @@ struct os_aio_array_struct{ WaitForMultipleObjects; used only in Windows */ #endif + +#if defined(LINUX_NATIVE_AIO) + io_context_t* aio_ctx; + /* completion queue for IO. There is + one such queue per segment. Each thread + will work on one ctx exclusively. */ + struct io_event* aio_events; + /* The array to collect completed IOs. + There is one such event for each + possible pending IO. The size of the + array is equal to n_slots. */ +#endif }; -/** Array of events used in simulated aio */ -static os_event_t* os_aio_segment_wait_events = NULL; +#if defined(LINUX_NATIVE_AIO) +/** timeout for each io_getevents() call = 500ms. */ +#define OS_AIO_REAP_TIMEOUT (500000000UL) + +/** time to sleep, in microseconds if io_setup() returns EAGAIN. */ +#define OS_AIO_IO_SETUP_RETRY_SLEEP (500000UL) -/* Number for the first global segment for reading. */ -const ulint os_aio_first_read_segment = 2; +/** number of attempts before giving up on io_setup(). */ +#define OS_AIO_IO_SETUP_RETRY_ATTEMPTS 5 +#endif -/* Number for the first global segment for writing. Set to -2 + os_aio_read_write_threads. */ -ulint os_aio_first_write_segment = 0; +/** Array of events used in simulated aio */ +static os_event_t* os_aio_segment_wait_events = NULL; /** The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These are NULL when the module has not yet been initialized. @{ */ @@ -211,19 +274,13 @@ static os_aio_array_t* os_aio_log_array = NULL; /*!< Redo log */ static os_aio_array_t* os_aio_sync_array = NULL; /*!< Synchronous I/O */ /* @} */ -/* Per thread buffer used for merged IO requests. Used by -os_aio_simulated_handle so that a buffer doesn't have to be allocated -for each request. */ -static byte* os_aio_thread_buffer[SRV_MAX_N_IO_THREADS]; -static ulint os_aio_thread_buffer_size[SRV_MAX_N_IO_THREADS]; - /** Number of asynchronous I/O segments. Set by os_aio_init(). */ static ulint os_aio_n_segments = ULINT_UNDEFINED; /** If the following is TRUE, read i/o handler threads try to wait until a batch of new read requests have been posted */ -static volatile ibool os_aio_recommend_sleep_for_read_threads = FALSE; -#endif /* UNIV_HOTBACKUP */ +static ibool os_aio_recommend_sleep_for_read_threads = FALSE; +#endif /* !UNIV_HOTBACKUP */ UNIV_INTERN ulint os_n_file_reads = 0; UNIV_INTERN ulint os_bytes_read_since_printout = 0; @@ -249,15 +306,45 @@ UNIV_INTERN ulint os_n_pending_writes = 0; /** Number of pending read operations */ UNIV_INTERN ulint os_n_pending_reads = 0; +#ifdef UNIV_DEBUG +/**********************************************************************//** +Validates the consistency the aio system some of the time. +@return TRUE if ok or the check was skipped */ +UNIV_INTERN +ibool +os_aio_validate_skip(void) +/*======================*/ +{ +/** Try os_aio_validate() every this many times */ +# define OS_AIO_VALIDATE_SKIP 13 + + /** The os_aio_validate() call skip counter. + Use a signed type because of the race condition below. */ + static int os_aio_validate_count = OS_AIO_VALIDATE_SKIP; + + /* There is a race condition below, but it does not matter, + because this call is only for heuristic purposes. We want to + reduce the call frequency of the costly os_aio_validate() + check in debug builds. */ + if (--os_aio_validate_count > 0) { + return(TRUE); + } + + os_aio_validate_count = OS_AIO_VALIDATE_SKIP; + return(os_aio_validate()); +} +#endif /* UNIV_DEBUG */ + +#ifdef __WIN__ /***********************************************************************//** Gets the operating system version. Currently works only on Windows. -@return OS_WIN95, OS_WIN31, OS_WINNT, OS_WIN2000 */ +@return OS_WIN95, OS_WIN31, OS_WINNT, OS_WIN2000, OS_WINXP, OS_WINVISTA, +OS_WIN7. */ UNIV_INTERN ulint os_get_os_version(void) /*===================*/ { -#ifdef __WIN__ OSVERSIONINFO os_info; os_info.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); @@ -269,21 +356,25 @@ os_get_os_version(void) } else if (os_info.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS) { return(OS_WIN95); } else if (os_info.dwPlatformId == VER_PLATFORM_WIN32_NT) { - if (os_info.dwMajorVersion <= 4) { - return(OS_WINNT); - } else { - return(OS_WIN2000); + switch (os_info.dwMajorVersion) { + case 3: + case 4: + return OS_WINNT; + case 5: + return (os_info.dwMinorVersion == 0) ? OS_WIN2000 + : OS_WINXP; + case 6: + return (os_info.dwMinorVersion == 0) ? OS_WINVISTA + : OS_WIN7; + default: + return OS_WIN7; } } else { ut_error; return(0); } -#else - ut_error; - - return(0); -#endif } +#endif /* __WIN__ */ /***********************************************************************//** Retrieves the last error number if an error occurs in a file io function. @@ -429,17 +520,29 @@ os_file_get_last_error( fflush(stderr); - if (err == ENOSPC) { + switch (err) { + case ENOSPC: return(OS_FILE_DISK_FULL); - } else if (err == ENOENT) { + case ENOENT: return(OS_FILE_NOT_FOUND); - } else if (err == EEXIST) { + case EEXIST: return(OS_FILE_ALREADY_EXISTS); - } else if (err == EXDEV || err == ENOTDIR || err == EISDIR) { + case EXDEV: + case ENOTDIR: + case EISDIR: return(OS_FILE_PATH_ERROR); - } else { - return(100 + err); + case EAGAIN: + if (srv_use_native_aio) { + return(OS_FILE_AIO_RESOURCES_RESERVED); + } + break; + case EINTR: + if (srv_use_native_aio) { + return(OS_FILE_AIO_INTERRUPTED); + } + break; } + return(100 + err); #endif } @@ -489,6 +592,9 @@ os_file_handle_error_cond_exit( } else if (err == OS_FILE_AIO_RESOURCES_RESERVED) { return(TRUE); + } else if (err == OS_FILE_AIO_INTERRUPTED) { + + return(TRUE); } else if (err == OS_FILE_ALREADY_EXISTS || err == OS_FILE_PATH_ERROR) { @@ -555,7 +661,7 @@ os_file_handle_error_no_exit( #undef USE_FILE_LOCK #define USE_FILE_LOCK -#if defined(UNIV_HOTBACKUP) || defined(__WIN__) || defined(__NETWARE__) +#if defined(UNIV_HOTBACKUP) || defined(__WIN__) /* InnoDB Hot Backup does not lock the data files. * On Windows, mandatory locking is used. */ @@ -605,45 +711,37 @@ os_io_init_simple(void) { ulint i; - os_file_count_mutex = os_mutex_create(NULL); + os_file_count_mutex = os_mutex_create(); for (i = 0; i < OS_FILE_N_SEEK_MUTEXES; i++) { - os_file_seek_mutexes[i] = os_mutex_create(NULL); + os_file_seek_mutexes[i] = os_mutex_create(); } } /***********************************************************************//** Creates a temporary file. This function is like tmpfile(3), but the temporary file is created in the MySQL temporary directory. -On Netware, this function is like tmpfile(3), because the C run-time -library of Netware does not expose the delete-on-close flag. @return temporary file handle, or NULL on error */ UNIV_INTERN FILE* os_file_create_tmpfile(void) /*========================*/ { -#ifdef __NETWARE__ - FILE* file = tmpfile(); -#else /* __NETWARE__ */ FILE* file = NULL; int fd = innobase_mysql_tmpfile(); if (fd >= 0) { file = fdopen(fd, "w+b"); } -#endif /* __NETWARE__ */ if (!file) { ut_print_timestamp(stderr); fprintf(stderr, " InnoDB: Error: unable to create temporary file;" " errno: %d\n", errno); -#ifndef __NETWARE__ if (fd >= 0) { close(fd); } -#endif /* !__NETWARE__ */ } return(file); @@ -964,13 +1062,15 @@ os_file_create_directory( } /****************************************************************//** +NOTE! Use the corresponding macro os_file_create_simple(), not directly +this function! A simple function to open or create a file. @return own: handle to the file, not defined if error, error number can be retrieved with os_file_get_last_error */ UNIV_INTERN os_file_t -os_file_create_simple( -/*==================*/ +os_file_create_simple_func( +/*=======================*/ const char* name, /*!< in: name of the file or path as a null-terminated string */ ulint create_mode,/*!< in: OS_FILE_OPEN if an existing file is @@ -1105,13 +1205,15 @@ try_again: } /****************************************************************//** +NOTE! Use the corresponding macro +os_file_create_simple_no_error_handling(), not directly this function! A simple function to open or create a file. @return own: handle to the file, not defined if error, error number can be retrieved with os_file_get_last_error */ UNIV_INTERN os_file_t -os_file_create_simple_no_error_handling( -/*====================================*/ +os_file_create_simple_no_error_handling_func( +/*=========================================*/ const char* name, /*!< in: name of the file or path as a null-terminated string */ ulint create_mode,/*!< in: OS_FILE_OPEN if an existing file @@ -1222,10 +1324,12 @@ UNIV_INTERN void os_file_set_nocache( /*================*/ - int fd, /*!< in: file descriptor to alter */ - const char* file_name, /*!< in: file name, used in the - diagnostic message */ - const char* operation_name) /*!< in: "open" or "create"; used in the + int fd /*!< in: file descriptor to alter */ + __attribute__((unused)), + const char* file_name /*!< in: used in the diagnostic message */ + __attribute__((unused)), + const char* operation_name __attribute__((unused))) + /*!< in: "open" or "create"; used in the diagnostic message */ { /* some versions of Solaris may not have DIRECTIO_ON */ @@ -1260,13 +1364,15 @@ os_file_set_nocache( } /****************************************************************//** +NOTE! Use the corresponding macro os_file_create(), not directly +this function! Opens an existing file or creates a new. @return own: handle to the file, not defined if error, error number can be retrieved with os_file_get_last_error */ UNIV_INTERN os_file_t -os_file_create( -/*===========*/ +os_file_create_func( +/*================*/ const char* name, /*!< in: name of the file or path as a null-terminated string */ ulint create_mode,/*!< in: OS_FILE_OPEN if an existing file @@ -1316,13 +1422,13 @@ try_again: buffering of writes in the OS */ attributes = 0; #ifdef WIN_ASYNC_IO - if (os_aio_use_native_aio) { + if (srv_use_native_aio) { attributes = attributes | FILE_FLAG_OVERLAPPED; } #endif #ifdef UNIV_NON_BUFFERED_IO # ifndef UNIV_HOTBACKUP - if (type == OS_LOG_FILE && srv_flush_log_at_trx_commit == 2) { + if (type == OS_LOG_FILE && thd_flush_log_at_trx_commit(NULL) == 2) { /* Do not use unbuffered i/o to log files because value 2 denotes that we do not flush the log at every commit, but only once per second */ @@ -1338,7 +1444,7 @@ try_again: attributes = 0; #ifdef UNIV_NON_BUFFERED_IO # ifndef UNIV_HOTBACKUP - if (type == OS_LOG_FILE && srv_flush_log_at_trx_commit == 2) { + if (type == OS_LOG_FILE && thd_flush_log_at_trx_commit(NULL) == 2) { /* Do not use unbuffered i/o to log files because value 2 denotes that we do not flush the log at every commit, but only once per second */ @@ -1407,8 +1513,6 @@ try_again: int create_flag; ibool retry; const char* mode_str = NULL; - const char* type_str = NULL; - const char* purpose_str = NULL; try_again: ut_a(name); @@ -1428,26 +1532,9 @@ try_again: ut_error; } - if (type == OS_LOG_FILE) { - type_str = "LOG"; - } else if (type == OS_DATA_FILE) { - type_str = "DATA"; - } else { - ut_error; - } + ut_a(type == OS_LOG_FILE || type == OS_DATA_FILE); + ut_a(purpose == OS_FILE_AIO || purpose == OS_FILE_NORMAL); - if (purpose == OS_FILE_AIO) { - purpose_str = "AIO"; - } else if (purpose == OS_FILE_NORMAL) { - purpose_str = "NORMAL"; - } else { - ut_error; - } - -#if 0 - fprintf(stderr, "Opening file %s, mode %s, type %s, purpose %s\n", - name, mode_str, type_str, purpose_str); -#endif #ifdef O_SYNC /* We let O_SYNC only affect log files; note that we map O_DSYNC to O_SYNC because the datasync options seemed to corrupt files in 2001 @@ -1664,13 +1751,14 @@ loop: } /***********************************************************************//** +NOTE! Use the corresponding macro os_file_rename(), not directly this function! Renames a file (can also move it to another directory). It is safest that the file is closed before calling this function. @return TRUE if success */ UNIV_INTERN ibool -os_file_rename( -/*===========*/ +os_file_rename_func( +/*================*/ const char* oldpath,/*!< in: old file path as a null-terminated string */ const char* newpath)/*!< in: new file path */ @@ -1703,13 +1791,14 @@ os_file_rename( } /***********************************************************************//** +NOTE! Use the corresponding macro os_file_close(), not directly this function! Closes a file handle. In case of error, error number can be retrieved with os_file_get_last_error. @return TRUE if success */ UNIV_INTERN ibool -os_file_close( -/*==========*/ +os_file_close_func( +/*===============*/ os_file_t file) /*!< in, own: handle to a file */ { #ifdef __WIN__ @@ -1928,7 +2017,7 @@ os_file_set_size( ut_free(buf2); - ret = os_file_flush(file); + ret = os_file_flush(file, TRUE); if (ret) { return(TRUE); @@ -1966,7 +2055,8 @@ static int os_file_fsync( /*==========*/ - os_file_t file) /*!< in: handle to a file */ + os_file_t file, /*!< in: handle to a file */ + ibool metadata) { int ret; int failures; @@ -1975,7 +2065,16 @@ os_file_fsync( failures = 0; do { +#if defined(HAVE_FDATASYNC) && HAVE_DECL_FDATASYNC + if (metadata) { + ret = fsync(file); + } else { + ret = fdatasync(file); + } +#else + (void) metadata; ret = fsync(file); +#endif os_n_fsyncs++; @@ -1994,6 +2093,9 @@ os_file_fsync( failures++; retry = TRUE; + } else if (ret == -1 && errno == EINTR) { + /* Handle signal interruptions correctly */ + retry = TRUE; } else { retry = FALSE; @@ -2005,13 +2107,15 @@ os_file_fsync( #endif /* !__WIN__ */ /***********************************************************************//** +NOTE! Use the corresponding macro os_file_flush(), not directly this function! Flushes the write buffers of a given file to the disk. @return TRUE if success */ UNIV_INTERN ibool -os_file_flush( -/*==========*/ - os_file_t file) /*!< in, own: handle to a file */ +os_file_flush_func( +/*===============*/ + os_file_t file, /*!< in, own: handle to a file */ + ibool metadata) { #ifdef __WIN__ BOOL ret; @@ -2061,18 +2165,18 @@ os_file_flush( /* If we are not on an operating system that supports this, then fall back to a plain fsync. */ - ret = os_file_fsync(file); + ret = os_file_fsync(file, metadata); } else { ret = fcntl(file, F_FULLFSYNC, NULL); if (ret) { /* If we are not on a file system that supports this, then fall back to a plain fsync. */ - ret = os_file_fsync(file); + ret = os_file_fsync(file, metadata); } } #else - ret = os_file_fsync(file); + ret = os_file_fsync(file, metadata); #endif if (ret == 0) { @@ -2106,12 +2210,9 @@ os_file_flush( /*******************************************************************//** Does a synchronous read operation in Posix. @return number of bytes read, -1 if error */ -#define os_file_pread(file, buf, n, offset, offset_high) \ - _os_file_pread(file, buf, n, offset, offset_high, NULL); - static ssize_t -_os_file_pread( +os_file_pread( /*==========*/ os_file_t file, /*!< in: handle to a file */ void* buf, /*!< in: buffer where to read */ @@ -2125,6 +2226,7 @@ _os_file_pread( off_t offs; #if defined(HAVE_PREAD) && !defined(HAVE_BROKEN_PREAD) ssize_t n_bytes; + ssize_t n_read; #endif /* HAVE_PREAD && !HAVE_BROKEN_PREAD */ ulint sec; ulint ms; @@ -2165,7 +2267,18 @@ _os_file_pread( os_n_pending_reads++; os_mutex_exit(os_file_count_mutex); - n_bytes = pread(file, buf, (ssize_t)n, offs); + /* Handle signal interruptions correctly */ + for (n_bytes = 0; n_bytes < (ssize_t) n; ) { + n_read = pread(file, buf, (ssize_t)n, offs); + if (n_read > 0) { + n_bytes += n_read; + offs += n_read; + } else if (n_read == -1 && errno == EINTR) { + continue; + } else { + break; + } + } os_mutex_enter(os_file_count_mutex); os_file_n_pending_preads--; @@ -2184,6 +2297,7 @@ _os_file_pread( { off_t ret_offset; ssize_t ret; + ssize_t n_read; #ifndef UNIV_HOTBACKUP ulint i; #endif /* !UNIV_HOTBACKUP */ @@ -2204,7 +2318,17 @@ _os_file_pread( if (ret_offset < 0) { ret = -1; } else { - ret = read(file, buf, (ssize_t)n); + /* Handle signal interruptions correctly */ + for (ret = 0; ret < (ssize_t) n; ) { + n_read = read(file, buf, (ssize_t)n); + if (n_read > 0) { + ret += n_read; + } else if (n_read == -1 && errno == EINTR) { + continue; + } else { + break; + } + } } #ifndef UNIV_HOTBACKUP @@ -2243,6 +2367,7 @@ os_file_pwrite( offset */ { ssize_t ret; + ssize_t n_written; off_t offs; ut_a((offset & 0xFFFFFFFFUL) == offset); @@ -2270,7 +2395,18 @@ os_file_pwrite( os_n_pending_writes++; os_mutex_exit(os_file_count_mutex); - ret = pwrite(file, buf, (ssize_t)n, offs); + /* Handle signal interruptions correctly */ + for (ret = 0; ret < (ssize_t) n; ) { + n_written = pwrite(file, buf, (ssize_t)n, offs); + if (n_written > 0) { + ret += n_written; + offs += n_written; + } else if (n_written == -1 && errno == EINTR) { + continue; + } else { + break; + } + } os_mutex_enter(os_file_count_mutex); os_file_n_pending_pwrites--; @@ -2286,7 +2422,7 @@ os_file_pwrite( the OS crashes, a database page is only partially physically written to disk. */ - ut_a(TRUE == os_file_flush(file)); + ut_a(TRUE == os_file_flush(file, TRUE)); } # endif /* UNIV_DO_FLUSH */ @@ -2317,7 +2453,17 @@ os_file_pwrite( goto func_exit; } - ret = write(file, buf, (ssize_t)n); + /* Handle signal interruptions correctly */ + for (ret = 0; ret < (ssize_t) n; ) { + n_written = write(file, buf, (ssize_t)n); + if (n_written > 0) { + ret += n_written; + } else if (n_written == -1 && errno == EINTR) { + continue; + } else { + break; + } + } # ifdef UNIV_DO_FLUSH if (srv_unix_file_flush_method != SRV_UNIX_LITTLESYNC @@ -2328,7 +2474,7 @@ os_file_pwrite( the OS crashes, a database page is only partially physically written to disk. */ - ut_a(TRUE == os_file_flush(file)); + ut_a(TRUE == os_file_flush(file, TRUE)); } # endif /* UNIV_DO_FLUSH */ @@ -2348,12 +2494,14 @@ func_exit: #endif /*******************************************************************//** +NOTE! Use the corresponding macro os_file_read(), not directly this +function! Requests a synchronous positioned read operation. @return TRUE if request was successful, FALSE if fail */ UNIV_INTERN ibool -_os_file_read( -/*=========*/ +os_file_read_func( +/*==============*/ os_file_t file, /*!< in: handle to a file */ void* buf, /*!< in: buffer where to read */ ulint offset, /*!< in: least significant 32 bits of file @@ -2374,7 +2522,10 @@ _os_file_read( ulint i; #endif /* !UNIV_HOTBACKUP */ + /* On 64-bit Windows, ulint is 64 bits. But offset and n should be + no more than 32 bits. */ ut_a((offset & 0xFFFFFFFFUL) == offset); + ut_a((n & 0xFFFFFFFFUL) == n); os_n_file_reads++; os_bytes_read_since_printout += n; @@ -2433,7 +2584,7 @@ try_again: os_bytes_read_since_printout += n; try_again: - ret = _os_file_pread(file, buf, n, offset, offset_high, trx); + ret = os_file_pread(file, buf, n, offset, offset_high, trx); if ((ulint)ret == n) { @@ -2472,13 +2623,15 @@ error_handling: } /*******************************************************************//** +NOTE! Use the corresponding macro os_file_read_no_error_handling(), +not directly this function! Requests a synchronous positioned read operation. This function does not do any error handling. In case of error it returns FALSE. @return TRUE if request was successful, FALSE if fail */ UNIV_INTERN ibool -os_file_read_no_error_handling( -/*===========================*/ +os_file_read_no_error_handling_func( +/*================================*/ os_file_t file, /*!< in: handle to a file */ void* buf, /*!< in: buffer where to read */ ulint offset, /*!< in: least significant 32 bits of file @@ -2498,7 +2651,10 @@ os_file_read_no_error_handling( ulint i; #endif /* !UNIV_HOTBACKUP */ + /* On 64-bit Windows, ulint is 64 bits. But offset and n should be + no more than 32 bits. */ ut_a((offset & 0xFFFFFFFFUL) == offset); + ut_a((n & 0xFFFFFFFFUL) == n); os_n_file_reads++; os_bytes_read_since_printout += n; @@ -2557,7 +2713,7 @@ try_again: os_bytes_read_since_printout += n; try_again: - ret = os_file_pread(file, buf, n, offset, offset_high); + ret = os_file_pread(file, buf, n, offset, offset_high, NULL); if ((ulint)ret == n) { @@ -2600,12 +2756,14 @@ os_file_read_string( } /*******************************************************************//** +NOTE! Use the corresponding macro os_file_write(), not directly +this function! Requests a synchronous write operation. @return TRUE if request was successful, FALSE if fail */ UNIV_INTERN ibool -os_file_write( -/*==========*/ +os_file_write_func( +/*===============*/ const char* name, /*!< in: name of the file or path as a null-terminated string */ os_file_t file, /*!< in: handle to a file */ @@ -2628,7 +2786,10 @@ os_file_write( ulint i; #endif /* !UNIV_HOTBACKUP */ - ut_a((offset & 0xFFFFFFFF) == offset); + /* On 64-bit Windows, ulint is 64 bits. But offset and n should be + no more than 32 bits. */ + ut_a((offset & 0xFFFFFFFFUL) == offset); + ut_a((n & 0xFFFFFFFFUL) == n); os_n_file_writes++; @@ -2686,7 +2847,7 @@ retry: # ifdef UNIV_DO_FLUSH if (!os_do_not_call_flush_at_each_write) { - ut_a(TRUE == os_file_flush(file)); + ut_a(TRUE == os_file_flush(file, TRUE)); } # endif /* UNIV_DO_FLUSH */ @@ -3064,15 +3225,106 @@ os_aio_array_get_nth_slot( return((array->slots) + index); } -/************************************************************************//** -Creates an aio wait array. -@return own: aio array */ +#if defined(LINUX_NATIVE_AIO) +/******************************************************************//** +Creates an io_context for native linux AIO. +@return TRUE on success. */ +static +ibool +os_aio_linux_create_io_ctx( +/*=======================*/ + ulint max_events, /*!< in: number of events. */ + io_context_t* io_ctx) /*!< out: io_ctx to initialize. */ +{ + int ret; + ulint retries = 0; + +retry: + memset(io_ctx, 0x0, sizeof(*io_ctx)); + + /* Initialize the io_ctx. Tell it how many pending + IO requests this context will handle. */ + + ret = io_setup(max_events, io_ctx); + if (ret == 0) { +#if defined(UNIV_AIO_DEBUG) + fprintf(stderr, + "InnoDB: Linux native AIO:" + " initialized io_ctx for segment\n"); +#endif + /* Success. Return now. */ + return(TRUE); + } + + /* If we hit EAGAIN we'll make a few attempts before failing. */ + + switch (ret) { + case -EAGAIN: + if (retries == 0) { + /* First time around. */ + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: Warning: io_setup() failed" + " with EAGAIN. Will make %d attempts" + " before giving up.\n", + OS_AIO_IO_SETUP_RETRY_ATTEMPTS); + } + + if (retries < OS_AIO_IO_SETUP_RETRY_ATTEMPTS) { + ++retries; + fprintf(stderr, + "InnoDB: Warning: io_setup() attempt" + " %lu failed.\n", + retries); + os_thread_sleep(OS_AIO_IO_SETUP_RETRY_SLEEP); + goto retry; + } + + /* Have tried enough. Better call it a day. */ + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: Error: io_setup() failed" + " with EAGAIN after %d attempts.\n", + OS_AIO_IO_SETUP_RETRY_ATTEMPTS); + break; + + case -ENOSYS: + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: Error: Linux Native AIO interface" + " is not supported on this platform. Please" + " check your OS documentation and install" + " appropriate binary of InnoDB.\n"); + + break; + + default: + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: Error: Linux Native AIO setup" + " returned following error[%d]\n", -ret); + break; + } + + fprintf(stderr, + "InnoDB: You can disable Linux Native AIO by" + " setting innodb_native_aio = off in my.cnf\n"); + return(FALSE); +} +#endif /* LINUX_NATIVE_AIO */ + +/******************************************************************//** +Creates an aio wait array. Note that we return NULL in case of failure. +We don't care about freeing memory here because we assume that a +failure will result in server refusing to start up. +@return own: aio array, NULL on failure */ static os_aio_array_t* os_aio_array_create( /*================*/ - ulint n, /*!< in: maximum number of pending aio operations - allowed; n must be divisible by n_segments */ + ulint n, /*!< in: maximum number of pending aio + operations allowed; n must be + divisible by n_segments */ ulint n_segments) /*!< in: number of segments in the aio array */ { os_aio_array_t* array; @@ -3080,13 +3332,15 @@ os_aio_array_create( os_aio_slot_t* slot; #ifdef WIN_ASYNC_IO OVERLAPPED* over; +#elif defined(LINUX_NATIVE_AIO) + struct io_event* io_event = NULL; #endif ut_a(n > 0); ut_a(n_segments > 0); array = ut_malloc(sizeof(os_aio_array_t)); - array->mutex = os_mutex_create(NULL); + array->mutex = os_mutex_create(); array->not_full = os_event_create(NULL); array->is_empty = os_event_create(NULL); @@ -3095,23 +3349,66 @@ os_aio_array_create( array->n_slots = n; array->n_segments = n_segments; array->n_reserved = 0; + array->cur_seg = 0; array->slots = ut_malloc(n * sizeof(os_aio_slot_t)); #ifdef __WIN__ - array->native_events = ut_malloc(n * sizeof(os_native_event_t)); + array->handles = ut_malloc(n * sizeof(HANDLE)); #endif + +#if defined(LINUX_NATIVE_AIO) + array->aio_ctx = NULL; + array->aio_events = NULL; + + /* If we are not using native aio interface then skip this + part of initialization. */ + if (!srv_use_native_aio) { + goto skip_native_aio; + } + + /* Initialize the io_context array. One io_context + per segment in the array. */ + + array->aio_ctx = ut_malloc(n_segments * + sizeof(*array->aio_ctx)); + for (i = 0; i < n_segments; ++i) { + if (!os_aio_linux_create_io_ctx(n/n_segments, + &array->aio_ctx[i])) { + /* If something bad happened during aio setup + we should call it a day and return right away. + We don't care about any leaks because a failure + to initialize the io subsystem means that the + server (or atleast the innodb storage engine) + is not going to startup. */ + return(NULL); + } + } + + /* Initialize the event array. One event per slot. */ + io_event = ut_malloc(n * sizeof(*io_event)); + memset(io_event, 0x0, sizeof(*io_event) * n); + array->aio_events = io_event; + +skip_native_aio: +#endif /* LINUX_NATIVE_AIO */ for (i = 0; i < n; i++) { slot = os_aio_array_get_nth_slot(array, i); slot->pos = i; slot->reserved = FALSE; #ifdef WIN_ASYNC_IO - slot->event = os_event_create(NULL); + slot->handle = CreateEvent(NULL,TRUE, FALSE, NULL); over = &(slot->control); - over->hEvent = slot->event->handle; + over->hEvent = slot->handle; + + *((array->handles) + i) = over->hEvent; - *((array->native_events) + i) = over->hEvent; +#elif defined(LINUX_NATIVE_AIO) + + memset(&slot->control, 0x0, sizeof(slot->control)); + slot->n_bytes = 0; + slot->ret = 0; #endif } @@ -3131,17 +3428,24 @@ os_aio_array_free( for (i = 0; i < array->n_slots; i++) { os_aio_slot_t* slot = os_aio_array_get_nth_slot(array, i); - os_event_free(slot->event); + CloseHandle(slot->handle); } #endif /* WIN_ASYNC_IO */ #ifdef __WIN__ - ut_free(array->native_events); + ut_free(array->handles); #endif /* __WIN__ */ os_mutex_free(array->mutex); os_event_free(array->not_full); os_event_free(array->is_empty); +#if defined(LINUX_NATIVE_AIO) + if (srv_use_native_aio) { + ut_free(array->aio_events); + ut_free(array->aio_ctx); + } +#endif /* LINUX_NATIVE_AIO */ + ut_free(array->slots); ut_free(array); } @@ -3154,7 +3458,7 @@ respectively. The caller must create an i/o handler thread for each segment in these arrays. This function also creates the sync array. No i/o handler thread needs to be created for that */ UNIV_INTERN -void +ibool os_aio_init( /*========*/ ulint n_per_seg, /*<! in: maximum number of pending aio @@ -3173,37 +3477,52 @@ os_aio_init( for (i = 0; i < n_segments; i++) { srv_set_io_thread_op_info(i, "not started yet"); - os_aio_thread_buffer[i] = 0; - os_aio_thread_buffer_size[i] = 0; } /* fprintf(stderr, "Array n per seg %lu\n", n_per_seg); */ - os_aio_first_write_segment = os_aio_first_read_segment + n_read_segs; os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1); + if (os_aio_ibuf_array == NULL) { + goto err_exit; + } srv_io_thread_function[0] = "insert buffer thread"; os_aio_log_array = os_aio_array_create(n_per_seg, 1); + if (os_aio_log_array == NULL) { + goto err_exit; + } srv_io_thread_function[1] = "log thread"; - os_aio_read_array = os_aio_array_create(n_per_seg, + os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg, n_read_segs); + if (os_aio_read_array == NULL) { + goto err_exit; + } + for (i = 2; i < 2 + n_read_segs; i++) { ut_a(i < SRV_MAX_N_IO_THREADS); srv_io_thread_function[i] = "read thread"; } - os_aio_write_array = os_aio_array_create(n_per_seg, + os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg, n_write_segs); + if (os_aio_write_array == NULL) { + goto err_exit; + } + for (i = 2 + n_read_segs; i < n_segments; i++) { ut_a(i < SRV_MAX_N_IO_THREADS); srv_io_thread_function[i] = "write thread"; } os_aio_sync_array = os_aio_array_create(n_slots_sync, 1); + if (os_aio_sync_array == NULL) { + goto err_exit; + } + os_aio_n_segments = n_segments; @@ -3217,6 +3536,11 @@ os_aio_init( os_last_printout = time(NULL); + return(TRUE); + +err_exit: + return(FALSE); + } /*********************************************************************** @@ -3262,7 +3586,7 @@ os_aio_array_wake_win_aio_at_shutdown( for (i = 0; i < array->n_slots; i++) { - os_event_set((array->slots + i)->event); + SetEvent((array->slots + i)->handle); } } #endif @@ -3283,6 +3607,19 @@ os_aio_wake_all_threads_at_shutdown(void) os_aio_array_wake_win_aio_at_shutdown(os_aio_write_array); os_aio_array_wake_win_aio_at_shutdown(os_aio_ibuf_array); os_aio_array_wake_win_aio_at_shutdown(os_aio_log_array); + +#elif defined(LINUX_NATIVE_AIO) + + /* When using native AIO interface the io helper threads + wait on io_getevents with a timeout value of 500ms. At + each wake up these threads check the server status. + No need to do anything to wake them up. */ + + if (srv_use_native_aio) { + return; + } + /* Fall through to simulated AIO handler wakeup if we are + not using native AIO. */ #endif /* This loop wakes up all simulated ai/o threads */ @@ -3399,16 +3736,27 @@ os_aio_array_reserve_slot( ulint offset_high, /*!< in: most significant 32 bits of offset */ ulint len, /*!< in: length of the block to read or write */ - trx_t* trx) + ulint space_id) { - os_aio_slot_t* slot; + os_aio_slot_t* slot = NULL; #ifdef WIN_ASYNC_IO OVERLAPPED* control; + +#elif defined(LINUX_NATIVE_AIO) + + struct iocb* iocb; + off_t aio_offset; + #endif ulint i; + ulint counter; ulint slots_per_seg; ulint local_seg; +#ifdef WIN_ASYNC_IO + ut_a((len & 0xFFFFFFFFUL) == len); +#endif + /* No need of a mutex. Only reading constant fields */ slots_per_seg = array->n_slots / array->n_segments; @@ -3424,7 +3772,7 @@ loop: if (array->n_reserved == array->n_slots) { os_mutex_exit(array->mutex); - if (!os_aio_use_native_aio) { + if (!srv_use_native_aio) { /* If the handler threads are suspended, wake them so that we get more slots */ @@ -3436,17 +3784,13 @@ loop: goto loop; } - /* First try to find a slot in the preferred local segment */ - for (i = local_seg * slots_per_seg; i < array->n_slots; i++) { - slot = os_aio_array_get_nth_slot(array, i); - - if (slot->reserved == FALSE) { - goto found; - } - } + /* We start our search for an available slot from our preferred + local segment and do a full scan of the array. We are + guaranteed to find a slot in full scan. */ + for (i = local_seg * slots_per_seg, counter = 0; + counter < array->n_slots; i++, counter++) { - /* Fall back to a full scan. We are guaranteed to find a slot */ - for (i = 0;; i++) { + i %= array->n_slots; slot = os_aio_array_get_nth_slot(array, i); if (slot->reserved == FALSE) { @@ -3454,6 +3798,9 @@ loop: } } + /* We MUST always be able to get hold of a reserved slot. */ + ut_error; + found: ut_a(slot->reserved == FALSE); array->n_reserved++; @@ -3477,16 +3824,50 @@ found: slot->buf = buf; slot->offset = offset; slot->offset_high = offset_high; -// slot->io_already_done = FALSE; - slot->status = OS_AIO_NOT_ISSUED; + slot->io_already_done = FALSE; + slot->space_id = space_id; #ifdef WIN_ASYNC_IO control = &(slot->control); control->Offset = (DWORD)offset; control->OffsetHigh = (DWORD)offset_high; - os_event_reset(slot->event); -#endif + ResetEvent(slot->handle); + +#elif defined(LINUX_NATIVE_AIO) + + /* If we are not using native AIO skip this part. */ + if (!srv_use_native_aio) { + goto skip_native_aio; + } + + /* Check if we are dealing with 64 bit arch. + If not then make sure that offset fits in 32 bits. */ + if (sizeof(aio_offset) == 8) { + aio_offset = offset_high; + aio_offset <<= 32; + aio_offset += offset; + } else { + ut_a(offset_high == 0); + aio_offset = offset; + } + + iocb = &slot->control; + + if (type == OS_FILE_READ) { + io_prep_pread(iocb, file, buf, len, aio_offset); + } else { + ut_a(type == OS_FILE_WRITE); + io_prep_pwrite(iocb, file, buf, len, aio_offset); + } + + iocb->data = (void*)slot; + slot->n_bytes = 0; + slot->ret = 0; + /*fprintf(stderr, "Filled up Linux native iocb.\n");*/ + +skip_native_aio: +#endif /* LINUX_NATIVE_AIO */ os_mutex_exit(array->mutex); return(slot); @@ -3509,7 +3890,6 @@ os_aio_array_free_slot( ut_ad(slot->reserved); slot->reserved = FALSE; - slot->status = OS_AIO_NOT_ISSUED; array->n_reserved--; @@ -3522,7 +3902,23 @@ os_aio_array_free_slot( } #ifdef WIN_ASYNC_IO - os_event_reset(slot->event); + + ResetEvent(slot->handle); + +#elif defined(LINUX_NATIVE_AIO) + + if (srv_use_native_aio) { + memset(&slot->control, 0x0, sizeof(slot->control)); + slot->n_bytes = 0; + slot->ret = 0; + /*fprintf(stderr, "Freed up Linux native slot.\n");*/ + } else { + /* These fields should not be used if we are not + using native AIO. */ + ut_ad(slot->n_bytes == 0); + ut_ad(slot->ret == 0); + } + #endif os_mutex_exit(array->mutex); } @@ -3542,22 +3938,20 @@ os_aio_simulated_wake_handler_thread( ulint n; ulint i; - ut_ad(!os_aio_use_native_aio); + ut_ad(!srv_use_native_aio); segment = os_aio_get_array_and_local_segment(&array, global_segment); - n = array->n_slots; + n = array->n_slots / array->n_segments; /* Look through n slots after the segment * n'th slot */ os_mutex_enter(array->mutex); for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, i); + slot = os_aio_array_get_nth_slot(array, i + segment * n); - if (slot->reserved && - (slot->status == OS_AIO_NOT_ISSUED || - slot->status == OS_AIO_DONE)) { + if (slot->reserved) { /* Found an i/o request */ break; @@ -3567,25 +3961,7 @@ os_aio_simulated_wake_handler_thread( os_mutex_exit(array->mutex); if (i < n) { - if (array == os_aio_ibuf_array) { - os_event_set(os_aio_segment_wait_events[0]); - - } else if (array == os_aio_log_array) { - os_event_set(os_aio_segment_wait_events[1]); - - } else if (array == os_aio_read_array) { - ulint x; - for (x = os_aio_first_read_segment; x < os_aio_first_write_segment; x++) - os_event_set(os_aio_segment_wait_events[x]); - - } else if (array == os_aio_write_array) { - ulint x; - for (x = os_aio_first_write_segment; x < os_aio_n_segments; x++) - os_event_set(os_aio_segment_wait_events[x]); - - } else { - ut_a(0); - } + os_event_set(os_aio_segment_wait_events[global_segment]); } } @@ -3596,7 +3972,9 @@ void os_aio_simulated_wake_handler_threads(void) /*=======================================*/ { - if (os_aio_use_native_aio) { + ulint i; + + if (srv_use_native_aio) { /* We do not use simulated aio: do nothing */ return; @@ -3604,10 +3982,9 @@ os_aio_simulated_wake_handler_threads(void) os_aio_recommend_sleep_for_read_threads = FALSE; - os_aio_simulated_wake_handler_thread(0); - os_aio_simulated_wake_handler_thread(1); - os_aio_simulated_wake_handler_thread(os_aio_first_read_segment); - os_aio_simulated_wake_handler_thread(os_aio_first_write_segment); + for (i = 0; i < os_aio_n_segments; i++) { + os_aio_simulated_wake_handler_thread(i); + } } /**********************************************************************//** @@ -3629,7 +4006,7 @@ readahead requests. */ os_aio_array_t* array; ulint g; - if (os_aio_use_native_aio) { + if (srv_use_native_aio) { /* We do not use simulated aio: do nothing */ return; @@ -3648,13 +4025,62 @@ readahead requests. */ #endif /* __WIN__ */ } +#if defined(LINUX_NATIVE_AIO) /*******************************************************************//** +Dispatch an AIO request to the kernel. +@return TRUE on success. */ +static +ibool +os_aio_linux_dispatch( +/*==================*/ + os_aio_array_t* array, /*!< in: io request array. */ + os_aio_slot_t* slot) /*!< in: an already reserved slot. */ +{ + int ret; + ulint io_ctx_index; + struct iocb* iocb; + + ut_ad(slot != NULL); + ut_ad(array); + + ut_a(slot->reserved); + + /* Find out what we are going to work with. + The iocb struct is directly in the slot. + The io_context is one per segment. */ + + iocb = &slot->control; + io_ctx_index = (slot->pos * array->n_segments) / array->n_slots; + + ret = io_submit(array->aio_ctx[io_ctx_index], 1, &iocb); + +#if defined(UNIV_AIO_DEBUG) + fprintf(stderr, + "io_submit[%c] ret[%d]: slot[%p] ctx[%p] seg[%lu]\n", + (slot->type == OS_FILE_WRITE) ? 'w' : 'r', ret, slot, + array->aio_ctx[io_ctx_index], (ulong)io_ctx_index); +#endif + + /* io_submit returns number of successfully + queued requests or -errno. */ + if (UNIV_UNLIKELY(ret != 1)) { + errno = -ret; + return(FALSE); + } + + return(TRUE); +} +#endif /* LINUX_NATIVE_AIO */ + + +/*******************************************************************//** +NOTE! Use the corresponding macro os_aio(), not directly this function! Requests an asynchronous i/o operation. @return TRUE if request was queued successfully, FALSE if fail */ UNIV_INTERN ibool -os_aio( -/*===*/ +os_aio_func( +/*========*/ ulint type, /*!< in: OS_FILE_READ or OS_FILE_WRITE */ ulint mode, /*!< in: OS_AIO_NORMAL, ..., possibly ORed to OS_AIO_SIMULATED_WAKE_LATER: the @@ -3687,6 +4113,7 @@ os_aio( (can be used to identify a completed aio operation); ignored if mode is OS_AIO_SYNC */ + ulint space_id, trx_t* trx) { os_aio_array_t* array; @@ -3698,8 +4125,7 @@ os_aio( struct fil_node_struct * dummy_mess1; void* dummy_mess2; ulint dummy_type; -#endif - ulint err = 0; +#endif /* WIN_ASYNC_IO */ ibool retry; ulint wake_later; @@ -3708,41 +4134,51 @@ os_aio( ut_ad(n > 0); ut_ad(n % OS_FILE_LOG_BLOCK_SIZE == 0); ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0); - ut_ad(os_aio_validate()); + ut_ad(os_aio_validate_skip()); +#ifdef WIN_ASYNC_IO + ut_ad((n & 0xFFFFFFFFUL) == n); +#endif wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER; mode = mode & (~OS_AIO_SIMULATED_WAKE_LATER); if (mode == OS_AIO_SYNC #ifdef WIN_ASYNC_IO - && !os_aio_use_native_aio -#endif + && !srv_use_native_aio +#endif /* WIN_ASYNC_IO */ ) { /* This is actually an ordinary synchronous read or write: no need to use an i/o-handler thread. NOTE that if we use Windows async i/o, Windows does not allow us to use ordinary synchronous os_file_read etc. on the same file, therefore we have built a special mechanism for synchronous - wait in the Windows case. */ + wait in the Windows case. + Also note that the Performance Schema instrumentation has + been performed by current os_aio_func()'s wrapper function + pfs_os_aio_func(). So we would no longer need to call + Performance Schema instrumented os_file_read() and + os_file_write(). Instead, we should use os_file_read_func() + and os_file_write_func() */ if (type == OS_FILE_READ) { - return(_os_file_read(file, buf, offset, + return(os_file_read_trx(file, buf, offset, offset_high, n, trx)); } ut_a(type == OS_FILE_WRITE); - return(os_file_write(name, file, buf, offset, offset_high, n)); + return(os_file_write_func(name, file, buf, offset, + offset_high, n)); } try_again: - if (mode == OS_AIO_NORMAL) { - if (type == OS_FILE_READ) { - array = os_aio_read_array; - } else { - array = os_aio_write_array; - } - } else if (mode == OS_AIO_IBUF) { + switch (mode) { + case OS_AIO_NORMAL: + array = (type == OS_FILE_READ) + ? os_aio_read_array + : os_aio_write_array; + break; + case OS_AIO_IBUF: ut_ad(type == OS_FILE_READ); /* Reduce probability of deadlock bugs in connection with ibuf: do not let the ibuf i/o handler sleep */ @@ -3750,14 +4186,21 @@ try_again: wake_later = FALSE; array = os_aio_ibuf_array; - } else if (mode == OS_AIO_LOG) { - + break; + case OS_AIO_LOG: array = os_aio_log_array; - } else if (mode == OS_AIO_SYNC) { + break; + case OS_AIO_SYNC: array = os_aio_sync_array; - } else { - array = NULL; /* Eliminate compiler warning */ + +#if defined(LINUX_NATIVE_AIO) + /* In Linux native AIO we don't use sync IO array. */ + ut_a(!srv_use_native_aio); +#endif /* LINUX_NATIVE_AIO */ + break; + default: ut_error; + array = NULL; /* Eliminate compiler warning */ } if (trx && type == OS_FILE_READ) @@ -3766,15 +4209,19 @@ try_again: trx->io_read += n; } slot = os_aio_array_reserve_slot(type, array, message1, message2, file, - name, buf, offset, offset_high, n, trx); + name, buf, offset, offset_high, n, space_id); if (type == OS_FILE_READ) { - if (os_aio_use_native_aio) { -#ifdef WIN_ASYNC_IO + if (srv_use_native_aio) { os_n_file_reads++; - os_bytes_read_since_printout += len; - + os_bytes_read_since_printout += n; +#ifdef WIN_ASYNC_IO ret = ReadFile(file, buf, (DWORD)n, &len, &(slot->control)); + +#elif defined(LINUX_NATIVE_AIO) + if (!os_aio_linux_dispatch(array, slot)) { + goto err_exit; + } #endif } else { if (!wake_later) { @@ -3784,11 +4231,16 @@ try_again: } } } else if (type == OS_FILE_WRITE) { - if (os_aio_use_native_aio) { -#ifdef WIN_ASYNC_IO + if (srv_use_native_aio) { os_n_file_writes++; +#ifdef WIN_ASYNC_IO ret = WriteFile(file, buf, (DWORD)n, &len, &(slot->control)); + +#elif defined(LINUX_NATIVE_AIO) + if (!os_aio_linux_dispatch(array, slot)) { + goto err_exit; + } #endif } else { if (!wake_later) { @@ -3802,7 +4254,7 @@ try_again: } #ifdef WIN_ASYNC_IO - if (os_aio_use_native_aio) { + if (srv_use_native_aio) { if ((ret && len == n) || (!ret && GetLastError() == ERROR_IO_PENDING)) { /* aio was queued successfully! */ @@ -3825,15 +4277,15 @@ try_again: return(TRUE); } - err = 1; /* Fall through the next if */ - } -#endif - if (err == 0) { - /* aio was queued successfully! */ - - return(TRUE); + goto err_exit; } +#endif /* WIN_ASYNC_IO */ + /* aio was queued successfully! */ + return(TRUE); +#if defined LINUX_NATIVE_AIO || defined WIN_ASYNC_IO +err_exit: +#endif /* LINUX_NATIVE_AIO || WIN_ASYNC_IO */ os_aio_array_free_slot(array, slot); retry = os_file_handle_error(name, @@ -3876,7 +4328,8 @@ os_aio_windows_handle( parameters are valid and can be used to restart the operation, for example */ void** message2, - ulint* type) /*!< out: OS_FILE_WRITE or ..._READ */ + ulint* type, /*!< out: OS_FILE_WRITE or ..._READ */ + ulint* space_id) { ulint orig_seg = segment; os_aio_array_t* array; @@ -3898,24 +4351,49 @@ os_aio_windows_handle( /* NOTE! We only access constant fields in os_aio_array. Therefore we do not have to acquire the protecting mutex yet */ - ut_ad(os_aio_validate()); + ut_ad(os_aio_validate_skip()); ut_ad(segment < array->n_segments); - n = array->n_slots; + n = array->n_slots / array->n_segments; if (array == os_aio_sync_array) { - os_event_wait(os_aio_array_get_nth_slot(array, pos)->event); + WaitForSingleObject( + os_aio_array_get_nth_slot(array, pos)->handle, + INFINITE); i = pos; } else { srv_set_io_thread_op_info(orig_seg, "wait Windows aio"); - i = os_event_wait_multiple(n, - (array->native_events) - ); + i = WaitForMultipleObjects((DWORD) n, + array->handles + segment * n, + FALSE, + INFINITE); + } + + if (srv_recovery_stats && recv_recovery_is_on() && n_consecutive) { + mutex_enter(&(recv_sys->mutex)); + if (slot->type == OS_FILE_READ) { + recv_sys->stats_read_io_pages += n_consecutive; + recv_sys->stats_read_io_consecutive[n_consecutive - 1]++; + } else if (slot->type == OS_FILE_WRITE) { + recv_sys->stats_write_io_pages += n_consecutive; + recv_sys->stats_write_io_consecutive[n_consecutive - 1]++; + } + mutex_exit(&(recv_sys->mutex)); } os_mutex_enter(array->mutex); - slot = os_aio_array_get_nth_slot(array, i); + if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS + && array->n_reserved == 0) { + *message1 = NULL; + *message2 = NULL; + os_mutex_exit(array->mutex); + return(TRUE); + } + + ut_a(i >= WAIT_OBJECT_0 && i <= WAIT_OBJECT_0 + n); + + slot = os_aio_array_get_nth_slot(array, i + segment * n); ut_a(slot->reserved); @@ -3930,6 +4408,7 @@ os_aio_windows_handle( *message2 = slot->message2; *type = slot->type; + *space_id = slot->space_id; if (ret && len == slot->len) { ret_val = TRUE; @@ -3937,7 +4416,9 @@ os_aio_windows_handle( #ifdef UNIV_DO_FLUSH if (slot->type == OS_FILE_WRITE && !os_do_not_call_flush_at_each_write) { - ut_a(TRUE == os_file_flush(slot->file)); + if (!os_file_flush(slot->file, TRUE)) { + ut_error; + } } #endif /* UNIV_DO_FLUSH */ } else if (os_file_handle_error(slot->name, "Windows aio")) { @@ -3954,16 +4435,30 @@ os_aio_windows_handle( /* retry failed read/write operation synchronously. No need to hold array->mutex. */ +#ifdef UNIV_PFS_IO + /* This read/write does not go through os_file_read + and os_file_write APIs, need to register with + performance schema explicitly here. */ + struct PSI_file_locker* locker = NULL; + register_pfs_file_io_begin(locker, slot->file, slot->len, + (slot->type == OS_FILE_WRITE) + ? PSI_FILE_WRITE + : PSI_FILE_READ, + __FILE__, __LINE__); +#endif + + ut_a((slot->len & 0xFFFFFFFFUL) == slot->len); + switch (slot->type) { case OS_FILE_WRITE: ret = WriteFile(slot->file, slot->buf, - slot->len, &len, + (DWORD) slot->len, &len, &(slot->control)); break; case OS_FILE_READ: ret = ReadFile(slot->file, slot->buf, - slot->len, &len, + (DWORD) slot->len, &len, &(slot->control)); break; @@ -3971,6 +4466,10 @@ os_aio_windows_handle( ut_error; } +#ifdef UNIV_PFS_IO + register_pfs_file_io_end(locker, len); +#endif + if (!ret && GetLastError() == ERROR_IO_PENDING) { /* aio was queued successfully! We want a synchronous i/o operation on a @@ -3992,6 +4491,261 @@ os_aio_windows_handle( } #endif +#if defined(LINUX_NATIVE_AIO) +/******************************************************************//** +This function is only used in Linux native asynchronous i/o. This is +called from within the io-thread. If there are no completed IO requests +in the slot array, the thread calls this function to collect more +requests from the kernel. +The io-thread waits on io_getevents(), which is a blocking call, with +a timeout value. Unless the system is very heavy loaded, keeping the +io-thread very busy, the io-thread will spend most of its time waiting +in this function. +The io-thread also exits in this function. It checks server status at +each wakeup and that is why we use timed wait in io_getevents(). */ +static +void +os_aio_linux_collect( +/*=================*/ + os_aio_array_t* array, /*!< in/out: slot array. */ + ulint segment, /*!< in: local segment no. */ + ulint seg_size) /*!< in: segment size. */ +{ + int i; + int ret; + ulint start_pos; + ulint end_pos; + struct timespec timeout; + struct io_event* events; + struct io_context* io_ctx; + + /* sanity checks. */ + ut_ad(array != NULL); + ut_ad(seg_size > 0); + ut_ad(segment < array->n_segments); + + /* Which part of event array we are going to work on. */ + events = &array->aio_events[segment * seg_size]; + + /* Which io_context we are going to use. */ + io_ctx = array->aio_ctx[segment]; + + /* Starting point of the segment we will be working on. */ + start_pos = segment * seg_size; + + /* End point. */ + end_pos = start_pos + seg_size; + +retry: + + /* Initialize the events. The timeout value is arbitrary. + We probably need to experiment with it a little. */ + memset(events, 0, sizeof(*events) * seg_size); + timeout.tv_sec = 0; + timeout.tv_nsec = OS_AIO_REAP_TIMEOUT; + + ret = io_getevents(io_ctx, 1, seg_size, events, &timeout); + + if (ret > 0) { + for (i = 0; i < ret; i++) { + os_aio_slot_t* slot; + struct iocb* control; + + control = (struct iocb *)events[i].obj; + ut_a(control != NULL); + + slot = (os_aio_slot_t *) control->data; + + /* Some sanity checks. */ + ut_a(slot != NULL); + ut_a(slot->reserved); + +#if defined(UNIV_AIO_DEBUG) + fprintf(stderr, + "io_getevents[%c]: slot[%p] ctx[%p]" + " seg[%lu]\n", + (slot->type == OS_FILE_WRITE) ? 'w' : 'r', + slot, io_ctx, segment); +#endif + + /* We are not scribbling previous segment. */ + ut_a(slot->pos >= start_pos); + + /* We have not overstepped to next segment. */ + ut_a(slot->pos < end_pos); + + /* Mark this request as completed. The error handling + will be done in the calling function. */ + os_mutex_enter(array->mutex); + slot->n_bytes = events[i].res; + slot->ret = events[i].res2; + slot->io_already_done = TRUE; + os_mutex_exit(array->mutex); + } + return; + } + + if (UNIV_UNLIKELY(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS)) { + return; + } + + /* This error handling is for any error in collecting the + IO requests. The errors, if any, for any particular IO + request are simply passed on to the calling routine. */ + + switch (ret) { + case -EAGAIN: + /* Not enough resources! Try again. */ + case -EINTR: + /* Interrupted! I have tested the behaviour in case of an + interrupt. If we have some completed IOs available then + the return code will be the number of IOs. We get EINTR only + if there are no completed IOs and we have been interrupted. */ + case 0: + /* No pending request! Go back and check again. */ + goto retry; + } + + /* All other errors should cause a trap for now. */ + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: unexpected ret_code[%d] from io_getevents()!\n", + ret); + ut_error; +} + +/**********************************************************************//** +This function is only used in Linux native asynchronous i/o. +Waits for an aio operation to complete. This function is used to wait for +the completed requests. The aio array of pending requests is divided +into segments. The thread specifies which segment or slot it wants to wait +for. NOTE: this function will also take care of freeing the aio slot, +therefore no other thread is allowed to do the freeing! +@return TRUE if the IO was successful */ +UNIV_INTERN +ibool +os_aio_linux_handle( +/*================*/ + ulint global_seg, /*!< in: segment number in the aio array + to wait for; segment 0 is the ibuf + i/o thread, segment 1 is log i/o thread, + then follow the non-ibuf read threads, + and the last are the non-ibuf write + threads. */ + fil_node_t**message1, /*!< out: the messages passed with the */ + void** message2, /*!< aio request; note that in case the + aio operation failed, these output + parameters are valid and can be used to + restart the operation. */ + ulint* type, /*!< out: OS_FILE_WRITE or ..._READ */ + ulint* space_id) +{ + ulint segment; + os_aio_array_t* array; + os_aio_slot_t* slot; + ulint n; + ulint i; + ibool ret = FALSE; + + /* Should never be doing Sync IO here. */ + ut_a(global_seg != ULINT_UNDEFINED); + + /* Find the array and the local segment. */ + segment = os_aio_get_array_and_local_segment(&array, global_seg); + n = array->n_slots / array->n_segments; + + /* Loop until we have found a completed request. */ + for (;;) { + ibool any_reserved = FALSE; + os_mutex_enter(array->mutex); + for (i = 0; i < n; ++i) { + slot = os_aio_array_get_nth_slot( + array, i + segment * n); + if (!slot->reserved) { + continue; + } else if (slot->io_already_done) { + /* Something for us to work on. */ + goto found; + } else { + any_reserved = TRUE; + } + } + + os_mutex_exit(array->mutex); + + /* There is no completed request. + If there is no pending request at all, + and the system is being shut down, exit. */ + if (UNIV_UNLIKELY + (!any_reserved + && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS)) { + *message1 = NULL; + *message2 = NULL; + return(TRUE); + } + + /* Wait for some request. Note that we return + from wait iff we have found a request. */ + + srv_set_io_thread_op_info(global_seg, + "waiting for completed aio requests"); + os_aio_linux_collect(array, segment, n); + } + +found: + /* Note that it may be that there are more then one completed + IO requests. We process them one at a time. We may have a case + here to improve the performance slightly by dealing with all + requests in one sweep. */ + srv_set_io_thread_op_info(global_seg, + "processing completed aio requests"); + + /* Ensure that we are scribbling only our segment. */ + ut_a(i < n); + + ut_ad(slot != NULL); + ut_ad(slot->reserved); + ut_ad(slot->io_already_done); + + *message1 = slot->message1; + *message2 = slot->message2; + + *type = slot->type; + *space_id = slot->space_id; + + if ((slot->ret == 0) && (slot->n_bytes == (long)slot->len)) { + ret = TRUE; + +#ifdef UNIV_DO_FLUSH + if (slot->type == OS_FILE_WRITE + && !os_do_not_call_flush_at_each_write) + && !os_file_flush(slot->file, TRUE) { + ut_error; + } +#endif /* UNIV_DO_FLUSH */ + } else { + errno = -slot->ret; + + /* os_file_handle_error does tell us if we should retry + this IO. As it stands now, we don't do this retry when + reaping requests from a different context than + the dispatcher. This non-retry logic is the same for + windows and linux native AIO. + We should probably look into this to transparently + re-submit the IO. */ + os_file_handle_error(slot->name, "Linux aio"); + + ret = FALSE; + } + + os_mutex_exit(array->mutex); + + os_aio_array_free_slot(array, slot); + + return(ret); +} +#endif /* LINUX_NATIVE_AIO */ + /**********************************************************************//** Does simulated aio. This function should be called by an i/o-handler thread. @@ -4011,28 +4765,26 @@ os_aio_simulated_handle( parameters are valid and can be used to restart the operation, for example */ void** message2, - ulint* type) /*!< out: OS_FILE_WRITE or ..._READ */ + ulint* type, /*!< out: OS_FILE_WRITE or ..._READ */ + ulint* space_id) { os_aio_array_t* array; ulint segment; os_aio_slot_t* slot; os_aio_slot_t* slot2; os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE]; - os_aio_slot_t* lowest_request; - os_aio_slot_t* oldest_request; ulint n_consecutive; ulint total_len; ulint offs; ulint lowest_offset; - ulint oldest_offset; ulint biggest_age; ulint age; byte* combined_buf; byte* combined_buf2; ibool ret; + ibool any_reserved; ulint n; ulint i; - time_t now; /* Fix compiler warning */ *consecutive_ios = NULL; @@ -4045,10 +4797,10 @@ restart: srv_set_io_thread_op_info(global_segment, "looking for i/o requests (a)"); - ut_ad(os_aio_validate()); + ut_ad(os_aio_validate_skip()); ut_ad(segment < array->n_segments); - n = array->n_slots; + n = array->n_slots / array->n_segments; /* Look through n slots after the segment * n'th slot */ @@ -4061,18 +4813,21 @@ restart: goto recommended_sleep; } - os_mutex_enter(array->mutex); - srv_set_io_thread_op_info(global_segment, "looking for i/o requests (b)"); /* Check if there is a slot for which the i/o has already been done */ + any_reserved = FALSE; + + os_mutex_enter(array->mutex); for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, i); + slot = os_aio_array_get_nth_slot(array, i + segment * n); - if (slot->reserved && slot->status == OS_AIO_DONE) { + if (!slot->reserved) { + continue; + } else if (slot->io_already_done) { if (os_aio_print_debug) { fprintf(stderr, @@ -4084,9 +4839,23 @@ restart: ret = TRUE; goto slot_io_done; + } else { + any_reserved = TRUE; } } + /* There is no completed request. + If there is no pending request at all, + and the system is being shut down, exit. */ + if (UNIV_UNLIKELY + (!any_reserved + && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS)) { + os_mutex_exit(array->mutex); + *message1 = NULL; + *message2 = NULL; + return(TRUE); + } + n_consecutive = 0; /* If there are at least 2 seconds old requests, then pick the oldest @@ -4094,57 +4863,72 @@ restart: then pick the one at the lowest offset. */ biggest_age = 0; - now = time(NULL); - oldest_request = lowest_request = NULL; - oldest_offset = lowest_offset = ULINT_MAX; + lowest_offset = ULINT_MAX; - /* Find the oldest request and the request with the smallest offset */ for (i = 0; i < n; i++) { - slot = os_aio_array_get_nth_slot(array, i); + slot = os_aio_array_get_nth_slot(array, i + segment * n); - if (slot->reserved && slot->status == OS_AIO_NOT_ISSUED) { - age = (ulint)difftime(now, slot->reservation_time); + if (slot->reserved) { + age = (ulint)difftime(time(NULL), + slot->reservation_time); if ((age >= 2 && age > biggest_age) || (age >= 2 && age == biggest_age - && slot->offset < oldest_offset)) { + && slot->offset < lowest_offset)) { /* Found an i/o request */ + consecutive_ios[0] = slot; + + n_consecutive = 1; + biggest_age = age; - oldest_request = slot; - oldest_offset = slot->offset; + lowest_offset = slot->offset; } + } + } + + if (n_consecutive == 0) { + /* There were no old requests. Look for an i/o request at the + lowest offset in the array (we ignore the high 32 bits of the + offset in these heuristics) */ + + lowest_offset = ULINT_MAX; + + for (i = 0; i < n; i++) { + slot = os_aio_array_get_nth_slot(array, + i + segment * n); + + if (slot->reserved && slot->offset < lowest_offset) { - /* Look for an i/o request at the lowest offset in the array - * (we ignore the high 32 bits of the offset) */ - if (slot->offset < lowest_offset) { /* Found an i/o request */ - lowest_request = slot; + consecutive_ios[0] = slot; + + n_consecutive = 1; + lowest_offset = slot->offset; } } } - if (!lowest_request && !oldest_request) { + if (n_consecutive == 0) { /* No i/o requested at the moment */ goto wait_for_io; } - if (oldest_request) { - slot = oldest_request; - } else { - slot = lowest_request; - } - consecutive_ios[0] = slot; - n_consecutive = 1; + /* if n_consecutive != 0, then we have assigned + something valid to consecutive_ios[0] */ + ut_ad(n_consecutive != 0); + ut_ad(consecutive_ios[0] != NULL); + + slot = consecutive_ios[0]; /* Check if there are several consecutive blocks to read or write */ consecutive_loop: for (i = 0; i < n; i++) { - slot2 = os_aio_array_get_nth_slot(array, i); + slot2 = os_aio_array_get_nth_slot(array, i + segment * n); if (slot2->reserved && slot2 != slot && slot2->offset == slot->offset + slot->len @@ -4152,8 +4936,7 @@ consecutive_loop: && slot->offset + slot->len > slot->offset && slot2->offset_high == slot->offset_high && slot2->type == slot->type - && slot2->file == slot->file - && slot2->status == OS_AIO_NOT_ISSUED) { + && slot2->file == slot->file) { /* Found a consecutive i/o request */ @@ -4182,8 +4965,6 @@ consecutive_loop: for (i = 0; i < n_consecutive; i++) { total_len += consecutive_ios[i]->len; - ut_a(consecutive_ios[i]->status == OS_AIO_NOT_ISSUED); - consecutive_ios[i]->status = OS_AIO_ISSUED; } if (n_consecutive == 1) { @@ -4191,14 +4972,7 @@ consecutive_loop: combined_buf = slot->buf; combined_buf2 = NULL; } else { - if ((total_len + UNIV_PAGE_SIZE) > os_aio_thread_buffer_size[global_segment]) { - if (os_aio_thread_buffer[global_segment]) - ut_free(os_aio_thread_buffer[global_segment]); - - os_aio_thread_buffer[global_segment] = ut_malloc(total_len + UNIV_PAGE_SIZE); - os_aio_thread_buffer_size[global_segment] = total_len + UNIV_PAGE_SIZE; - } - combined_buf2 = os_aio_thread_buffer[global_segment]; + combined_buf2 = ut_malloc(total_len + UNIV_PAGE_SIZE); ut_a(combined_buf2); @@ -4209,9 +4983,6 @@ consecutive_loop: this assumes that there is just one i/o-handler thread serving a single segment of slots! */ - ut_a(slot->reserved); - ut_a(slot->status == OS_AIO_ISSUED); - os_mutex_exit(array->mutex); if (slot->type == OS_FILE_WRITE && n_consecutive > 1) { @@ -4267,16 +5038,8 @@ consecutive_loop: } } - if (srv_recovery_stats && recv_recovery_is_on() && n_consecutive) { - mutex_enter(&(recv_sys->mutex)); - if (slot->type == OS_FILE_READ) { - recv_sys->stats_read_io_pages += n_consecutive; - recv_sys->stats_read_io_consecutive[n_consecutive - 1]++; - } else if (slot->type == OS_FILE_WRITE) { - recv_sys->stats_write_io_pages += n_consecutive; - recv_sys->stats_write_io_consecutive[n_consecutive - 1]++; - } - mutex_exit(&(recv_sys->mutex)); + if (combined_buf2) { + ut_free(combined_buf2); } os_mutex_enter(array->mutex); @@ -4284,8 +5047,7 @@ consecutive_loop: /* Mark the i/os done in slots */ for (i = 0; i < n_consecutive; i++) { - ut_a(consecutive_ios[i]->status == OS_AIO_ISSUED); - consecutive_ios[i]->status = OS_AIO_DONE; + consecutive_ios[i]->io_already_done = TRUE; } /* We return the messages for the first slot now, and if there were @@ -4295,13 +5057,12 @@ consecutive_loop: slot_io_done: ut_a(slot->reserved); - ut_a(slot->status == OS_AIO_DONE); - slot->status = OS_AIO_CLAIMED; *message1 = slot->message1; *message2 = slot->message2; *type = slot->type; + *space_id = slot->space_id; os_mutex_exit(array->mutex); @@ -4388,6 +5149,40 @@ os_aio_validate(void) } /**********************************************************************//** +Prints pending IO requests per segment of an aio array. +We probably don't need per segment statistics but they can help us +during development phase to see if the IO requests are being +distributed as expected. */ +static +void +os_aio_print_segment_info( +/*======================*/ + FILE* file, /*!< in: file where to print */ + ulint* n_seg, /*!< in: pending IO array */ + os_aio_array_t* array) /*!< in: array to process */ +{ + ulint i; + + ut_ad(array); + ut_ad(n_seg); + ut_ad(array->n_segments > 0); + + if (array->n_segments == 1) { + return; + } + + fprintf(file, " ["); + for (i = 0; i < array->n_segments; i++) { + if (i != 0) { + fprintf(file, ", "); + } + + fprintf(file, "%lu", n_seg[i]); + } + fprintf(file, "] "); +} + +/**********************************************************************//** Prints info of the aio arrays. */ UNIV_INTERN void @@ -4398,6 +5193,7 @@ os_aio_print( os_aio_array_t* array; os_aio_slot_t* slot; ulint n_reserved; + ulint n_res_seg[SRV_MAX_N_IO_THREADS]; time_t current_time; double time_elapsed; double avg_bytes_read; @@ -4430,11 +5226,17 @@ loop: n_reserved = 0; + memset(n_res_seg, 0x0, sizeof(n_res_seg)); + for (i = 0; i < array->n_slots; i++) { + ulint seg_no; + slot = os_aio_array_get_nth_slot(array, i); + seg_no = (i * array->n_segments) / array->n_slots; if (slot->reserved) { n_reserved++; + n_res_seg[seg_no]++; #if 0 fprintf(stderr, "Reserved slot, messages %p %p\n", (void*) slot->message1, @@ -4448,6 +5250,8 @@ loop: fprintf(file, " %lu", (ulong) n_reserved); + os_aio_print_segment_info(file, n_res_seg, array); + os_mutex_exit(array->mutex); if (array == os_aio_read_array) { diff --git a/storage/xtradb/os/os0proc.c b/storage/xtradb/os/os0proc.c index 4567d96b6f4..0f56a608f38 100644 --- a/storage/xtradb/os/os0proc.c +++ b/storage/xtradb/os/os0proc.c @@ -145,7 +145,7 @@ skip: os_fast_mutex_unlock(&ut_list_mutex); UNIV_MEM_ALLOC(ptr, size); } -#elif defined __NETWARE__ || !defined OS_MAP_ANON +#elif !defined OS_MAP_ANON size = *n; ptr = ut_malloc_low(size, TRUE, FALSE); #else @@ -213,7 +213,7 @@ os_mem_free_large( os_fast_mutex_unlock(&ut_list_mutex); UNIV_MEM_FREE(ptr, size); } -#elif defined __NETWARE__ || !defined OS_MAP_ANON +#elif !defined OS_MAP_ANON ut_free(ptr); #else if (munmap(ptr, size)) { @@ -229,173 +229,3 @@ os_mem_free_large( } #endif } - -/****************************************************************//** -Allocates or attaches and reuses shared memory segment. -The content is not cleared automatically. -@return allocated memory */ -UNIV_INTERN -void* -os_shm_alloc( -/*=========*/ - ulint* n, /*!< in/out: number of bytes */ - uint key, - ibool* is_new) -{ - void* ptr; -#if defined HAVE_SYS_IPC_H && HAVE_SYS_SHM_H - ulint size; - int shmid; - - *is_new = FALSE; - fprintf(stderr, - "InnoDB: The shared memory segment containing the buffer pool is: key %#x (%d).\n", - key, key); -# if defined HAVE_LARGE_PAGES && defined UNIV_LINUX - if (!os_use_large_pages || !os_large_page_size) { - goto skip; - } - - /* Align block size to os_large_page_size */ - ut_ad(ut_is_2pow(os_large_page_size)); - size = ut_2pow_round(*n + (os_large_page_size - 1), - os_large_page_size); - - shmid = shmget((key_t)key, (size_t)size, - IPC_CREAT | IPC_EXCL | SHM_HUGETLB | SHM_R | SHM_W); - if (shmid < 0) { - if (errno == EEXIST) { - fprintf(stderr, - "InnoDB: HugeTLB: The shared memory segment exists.\n"); - shmid = shmget((key_t)key, (size_t)size, - SHM_HUGETLB | SHM_R | SHM_W); - if (shmid < 0) { - fprintf(stderr, - "InnoDB: HugeTLB: Warning: Failed to allocate %lu bytes. (reuse) errno %d\n", - size, errno); - goto skip; - } else { - fprintf(stderr, - "InnoDB: HugeTLB: The existent shared memory segment is used.\n"); - } - } else { - fprintf(stderr, - "InnoDB: HugeTLB: Warning: Failed to allocate %lu bytes. (new) errno %d\n", - size, errno); - goto skip; - } - } else { - *is_new = TRUE; - fprintf(stderr, - "InnoDB: HugeTLB: A new shared memory segment has been created .\n"); - } - - ptr = shmat(shmid, NULL, 0); - if (ptr == (void *)-1) { - fprintf(stderr, - "InnoDB: HugeTLB: Warning: Failed to attach shared memory segment, errno %d\n", - errno); - ptr = NULL; - } - - if (ptr) { - *n = size; - os_fast_mutex_lock(&ut_list_mutex); - ut_total_allocated_memory += size; - os_fast_mutex_unlock(&ut_list_mutex); - UNIV_MEM_ALLOC(ptr, size); - return(ptr); - } -skip: - *is_new = FALSE; -# endif /* HAVE_LARGE_PAGES && defined UNIV_LINUX */ -# ifdef HAVE_GETPAGESIZE - size = getpagesize(); -# else - size = UNIV_PAGE_SIZE; -# endif - /* Align block size to system page size */ - ut_ad(ut_is_2pow(size)); - size = *n = ut_2pow_round(*n + (size - 1), size); - - shmid = shmget((key_t)key, (size_t)size, - IPC_CREAT | IPC_EXCL | SHM_R | SHM_W); - if (shmid < 0) { - if (errno == EEXIST) { - fprintf(stderr, - "InnoDB: A shared memory segment containing the buffer pool seems to already exist.\n"); - shmid = shmget((key_t)key, (size_t)size, - SHM_R | SHM_W); - if (shmid < 0) { - fprintf(stderr, - "InnoDB: Warning: Failed to allocate %lu bytes. (reuse) errno %d\n", - size, errno); - ptr = NULL; - goto end; - } else { - fprintf(stderr, - "InnoDB: The existent shared memory segment is used.\n"); - } - } else { - fprintf(stderr, - "InnoDB: Warning: Failed to allocate %lu bytes. (new) errno %d\n", - size, errno); - ptr = NULL; - goto end; - } - } else { - *is_new = TRUE; - fprintf(stderr, - "InnoDB: A new shared memory segment has been created.\n"); - } - - ptr = shmat(shmid, NULL, 0); - if (ptr == (void *)-1) { - fprintf(stderr, - "InnoDB: Warning: Failed to attach shared memory segment, errno %d\n", - errno); - ptr = NULL; - } - - if (ptr) { - *n = size; - os_fast_mutex_lock(&ut_list_mutex); - ut_total_allocated_memory += size; - os_fast_mutex_unlock(&ut_list_mutex); - UNIV_MEM_ALLOC(ptr, size); - } -end: -#else /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */ - fprintf(stderr, "InnoDB: shared memory segment is not supported.\n"); - ptr = NULL; -#endif /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */ - return(ptr); -} - -/****************************************************************//** -Detach shared memory segment. */ -UNIV_INTERN -void -os_shm_free( -/*========*/ - void *ptr, /*!< in: pointer returned by - os_shm_alloc() */ - ulint size) /*!< in: size returned by - os_shm_alloc() */ -{ - os_fast_mutex_lock(&ut_list_mutex); - ut_a(ut_total_allocated_memory >= size); - os_fast_mutex_unlock(&ut_list_mutex); - -#if defined HAVE_SYS_IPC_H && HAVE_SYS_SHM_H - if (!shmdt(ptr)) { - os_fast_mutex_lock(&ut_list_mutex); - ut_a(ut_total_allocated_memory >= size); - ut_total_allocated_memory -= size; - os_fast_mutex_unlock(&ut_list_mutex); - UNIV_MEM_FREE(ptr, size); - } -#else /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */ - fprintf(stderr, "InnoDB: shared memory segment is not supported.\n"); -#endif /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */ -} diff --git a/storage/xtradb/os/os0sync.c b/storage/xtradb/os/os0sync.c index f9ab58c2ee4..41a19843812 100644 --- a/storage/xtradb/os/os0sync.c +++ b/storage/xtradb/os/os0sync.c @@ -31,13 +31,11 @@ Created 9/6/1995 Heikki Tuuri #ifdef __WIN__ #include <windows.h> -#else -#include <sys/time.h> -#include <time.h> #endif #include "ut0mem.h" #include "srv0start.h" +#include "srv0srv.h" /* Type definition for an operating system mutex struct */ struct os_mutex_struct{ @@ -74,11 +72,227 @@ UNIV_INTERN ulint os_event_count = 0; UNIV_INTERN ulint os_mutex_count = 0; UNIV_INTERN ulint os_fast_mutex_count = 0; +/* The number of microsecnds in a second. */ +static const ulint MICROSECS_IN_A_SECOND = 1000000; + /* Because a mutex is embedded inside an event and there is an event embedded inside a mutex, on free, this generates a recursive call. This version of the free event function doesn't acquire the global lock */ static void os_event_free_internal(os_event_t event); +/* On Windows (Vista and later), load function pointers for condition +variable handling. Those functions are not available in prior versions, +so we have to use them via runtime loading, as long as we support XP. */ +static void os_cond_module_init(void); + +#ifdef __WIN__ +/* Prototypes and function pointers for condition variable functions */ +typedef VOID (WINAPI* InitializeConditionVariableProc) + (PCONDITION_VARIABLE ConditionVariable); +static InitializeConditionVariableProc initialize_condition_variable; + +typedef BOOL (WINAPI* SleepConditionVariableCSProc) + (PCONDITION_VARIABLE ConditionVariable, + PCRITICAL_SECTION CriticalSection, + DWORD dwMilliseconds); +static SleepConditionVariableCSProc sleep_condition_variable; + +typedef VOID (WINAPI* WakeAllConditionVariableProc) + (PCONDITION_VARIABLE ConditionVariable); +static WakeAllConditionVariableProc wake_all_condition_variable; + +typedef VOID (WINAPI* WakeConditionVariableProc) + (PCONDITION_VARIABLE ConditionVariable); +static WakeConditionVariableProc wake_condition_variable; +#endif + +/*********************************************************//** +Initialitze condition variable */ +UNIV_INLINE +void +os_cond_init( +/*=========*/ + os_cond_t* cond) /*!< in: condition variable. */ +{ + ut_a(cond); + +#ifdef __WIN__ + ut_a(initialize_condition_variable != NULL); + initialize_condition_variable(cond); +#else + ut_a(pthread_cond_init(cond, NULL) == 0); +#endif +} + +/*********************************************************//** +Do a timed wait on condition variable. +@return TRUE if timed out, FALSE otherwise */ +UNIV_INLINE +ibool +os_cond_wait_timed( +/*===============*/ + os_cond_t* cond, /*!< in: condition variable. */ + os_fast_mutex_t* mutex, /*!< in: fast mutex */ +#ifndef __WIN__ + const struct timespec* abstime /*!< in: timeout */ +#else + DWORD time_in_ms /*!< in: timeout in + milliseconds*/ +#endif /* !__WIN__ */ +) +{ +#ifdef __WIN__ + BOOL ret; + DWORD err; + + ut_a(sleep_condition_variable != NULL); + + ret = sleep_condition_variable(cond, mutex, time_in_ms); + + if (!ret) { + err = GetLastError(); + /* From http://msdn.microsoft.com/en-us/library/ms686301%28VS.85%29.aspx, + "Condition variables are subject to spurious wakeups + (those not associated with an explicit wake) and stolen wakeups + (another thread manages to run before the woken thread)." + Check for both types of timeouts. + Conditions are checked by the caller.*/ + if ((err == WAIT_TIMEOUT) || (err == ERROR_TIMEOUT)) { + return(TRUE); + } + } + + ut_a(ret); + + return(FALSE); +#else + int ret; + + ret = pthread_cond_timedwait(cond, mutex, abstime); + + switch (ret) { + case 0: + case ETIMEDOUT: + /* We play it safe by checking for EINTR even though + according to the POSIX documentation it can't return EINTR. */ + case EINTR: + break; + + default: + fprintf(stderr, " InnoDB: pthread_cond_timedwait() returned: " + "%d: abstime={%lu,%lu}\n", + ret, (ulong) abstime->tv_sec, (ulong) abstime->tv_nsec); + ut_error; + } + + return(ret == ETIMEDOUT); +#endif +} +/*********************************************************//** +Wait on condition variable */ +UNIV_INLINE +void +os_cond_wait( +/*=========*/ + os_cond_t* cond, /*!< in: condition variable. */ + os_fast_mutex_t* mutex) /*!< in: fast mutex */ +{ + ut_a(cond); + ut_a(mutex); + +#ifdef __WIN__ + ut_a(sleep_condition_variable != NULL); + ut_a(sleep_condition_variable(cond, mutex, INFINITE)); +#else + ut_a(pthread_cond_wait(cond, mutex) == 0); +#endif +} + +/*********************************************************//** +Wakes all threads waiting for condition variable */ +UNIV_INLINE +void +os_cond_broadcast( +/*==============*/ + os_cond_t* cond) /*!< in: condition variable. */ +{ + ut_a(cond); + +#ifdef __WIN__ + ut_a(wake_all_condition_variable != NULL); + wake_all_condition_variable(cond); +#else + ut_a(pthread_cond_broadcast(cond) == 0); +#endif +} + +/*********************************************************//** +Wakes one thread waiting for condition variable */ +UNIV_INLINE +void +os_cond_signal( +/*==========*/ + os_cond_t* cond) /*!< in: condition variable. */ +{ + ut_a(cond); + +#ifdef __WIN__ + ut_a(wake_condition_variable != NULL); + wake_condition_variable(cond); +#else + ut_a(pthread_cond_signal(cond) == 0); +#endif +} + +/*********************************************************//** +Destroys condition variable */ +UNIV_INLINE +void +os_cond_destroy( +/*============*/ + os_cond_t* cond) /*!< in: condition variable. */ +{ +#ifdef __WIN__ + /* Do nothing */ +#else + ut_a(pthread_cond_destroy(cond) == 0); +#endif +} + +/*********************************************************//** +On Windows (Vista and later), load function pointers for condition variable +handling. Those functions are not available in prior versions, so we have to +use them via runtime loading, as long as we support XP. */ +static +void +os_cond_module_init(void) +/*=====================*/ +{ +#ifdef __WIN__ + HMODULE h_dll; + + if (!srv_use_native_conditions) + return; + + h_dll = GetModuleHandle("kernel32"); + + initialize_condition_variable = (InitializeConditionVariableProc) + GetProcAddress(h_dll, "InitializeConditionVariable"); + sleep_condition_variable = (SleepConditionVariableCSProc) + GetProcAddress(h_dll, "SleepConditionVariableCS"); + wake_all_condition_variable = (WakeAllConditionVariableProc) + GetProcAddress(h_dll, "WakeAllConditionVariable"); + wake_condition_variable = (WakeConditionVariableProc) + GetProcAddress(h_dll, "WakeConditionVariable"); + + /* When using native condition variables, check function pointers */ + ut_a(initialize_condition_variable); + ut_a(sleep_condition_variable); + ut_a(wake_all_condition_variable); + ut_a(wake_condition_variable); +#endif +} + /*********************************************************//** Initializes global event and OS 'slow' mutex lists. */ UNIV_INTERN @@ -92,7 +306,10 @@ os_sync_init(void) os_sync_mutex = NULL; os_sync_mutex_inited = FALSE; - os_sync_mutex = os_mutex_create(NULL); + /* Now for Windows only */ + os_cond_module_init(); + + os_sync_mutex = os_mutex_create(); os_sync_mutex_inited = TRUE; } @@ -146,42 +363,45 @@ os_event_create( const char* name) /*!< in: the name of the event, if NULL the event is created without a name */ { -#ifdef __WIN__ - os_event_t event; - - event = ut_malloc(sizeof(struct os_event_struct)); - - event->handle = CreateEvent(NULL, /* No security attributes */ - TRUE, /* Manual reset */ - FALSE, /* Initial state nonsignaled */ - (LPCTSTR) name); - if (!event->handle) { - fprintf(stderr, - "InnoDB: Could not create a Windows event semaphore;" - " Windows error %lu\n", - (ulong) GetLastError()); - } -#else /* Unix */ os_event_t event; - UT_NOT_USED(name); +#ifdef __WIN__ + if(!srv_use_native_conditions) { + + event = ut_malloc(sizeof(struct os_event_struct)); + + event->handle = CreateEvent(NULL, + TRUE, + FALSE, + (LPCTSTR) name); + if (!event->handle) { + fprintf(stderr, + "InnoDB: Could not create a Windows event" + " semaphore; Windows error %lu\n", + (ulong) GetLastError()); + } + } else /* Windows with condition variables */ +#endif + + { + UT_NOT_USED(name); - event = ut_malloc(sizeof(struct os_event_struct)); + event = ut_malloc(sizeof(struct os_event_struct)); - os_fast_mutex_init(&(event->os_mutex)); + os_fast_mutex_init(&(event->os_mutex)); - ut_a(0 == pthread_cond_init(&(event->cond_var), NULL)); + os_cond_init(&(event->cond_var)); - event->is_set = FALSE; + event->is_set = FALSE; - /* We return this value in os_event_reset(), which can then be - be used to pass to the os_event_wait_low(). The value of zero - is reserved in os_event_wait_low() for the case when the - caller does not want to pass any signal_count value. To - distinguish between the two cases we initialize signal_count - to 1 here. */ - event->signal_count = 1; -#endif /* __WIN__ */ + /* We return this value in os_event_reset(), which can then be + be used to pass to the os_event_wait_low(). The value of zero + is reserved in os_event_wait_low() for the case when the + caller does not want to pass any signal_count value. To + distinguish between the two cases we initialize signal_count + to 1 here. */ + event->signal_count = 1; + } /* The os_sync_mutex can be NULL because during startup an event can be created [ because it's embedded in the mutex/rwlock ] before @@ -211,10 +431,15 @@ os_event_set( /*=========*/ os_event_t event) /*!< in: event to set */ { -#ifdef __WIN__ ut_a(event); - ut_a(SetEvent(event->handle)); -#else + +#ifdef __WIN__ + if (!srv_use_native_conditions) { + ut_a(SetEvent(event->handle)); + return; + } +#endif + ut_a(event); os_fast_mutex_lock(&(event->os_mutex)); @@ -224,11 +449,10 @@ os_event_set( } else { event->is_set = TRUE; event->signal_count += 1; - ut_a(0 == pthread_cond_broadcast(&(event->cond_var))); + os_cond_broadcast(&(event->cond_var)); } os_fast_mutex_unlock(&(event->os_mutex)); -#endif } /**********************************************************//** @@ -247,12 +471,14 @@ os_event_reset( { ib_int64_t ret = 0; -#ifdef __WIN__ ut_a(event); - ut_a(ResetEvent(event->handle)); -#else - ut_a(event); +#ifdef __WIN__ + if(!srv_use_native_conditions) { + ut_a(ResetEvent(event->handle)); + return(0); + } +#endif os_fast_mutex_lock(&(event->os_mutex)); @@ -264,7 +490,6 @@ os_event_reset( ret = event->signal_count; os_fast_mutex_unlock(&(event->os_mutex)); -#endif return(ret); } @@ -277,19 +502,21 @@ os_event_free_internal( os_event_t event) /*!< in: event to free */ { #ifdef __WIN__ - ut_a(event); + if(!srv_use_native_conditions) { + ut_a(event); + ut_a(CloseHandle(event->handle)); + } else +#endif + { + ut_a(event); - ut_a(CloseHandle(event->handle)); -#else - ut_a(event); + /* This is to avoid freeing the mutex twice */ + os_fast_mutex_free(&(event->os_mutex)); - /* This is to avoid freeing the mutex twice */ - os_fast_mutex_free(&(event->os_mutex)); + os_cond_destroy(&(event->cond_var)); + } - ut_a(0 == pthread_cond_destroy(&(event->cond_var))); -#endif /* Remove from the list of events */ - UT_LIST_REMOVE(os_event_list, os_event_list, event); os_event_count--; @@ -306,18 +533,19 @@ os_event_free( os_event_t event) /*!< in: event to free */ { -#ifdef __WIN__ ut_a(event); +#ifdef __WIN__ + if(!srv_use_native_conditions){ + ut_a(CloseHandle(event->handle)); + } else /*Windows with condition variables */ +#endif + { + os_fast_mutex_free(&(event->os_mutex)); - ut_a(CloseHandle(event->handle)); -#else - ut_a(event); + os_cond_destroy(&(event->cond_var)); + } - os_fast_mutex_free(&(event->os_mutex)); - ut_a(0 == pthread_cond_destroy(&(event->cond_var))); -#endif /* Remove from the list of events */ - os_mutex_enter(os_sync_mutex); UT_LIST_REMOVE(os_event_list, os_event_list, event); @@ -330,10 +558,7 @@ os_event_free( } /**********************************************************//** -Waits for an event object until it is in the signaled state. If -srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS this also exits the -waiting thread when the event becomes signaled (or immediately if the -event is already in the signaled state). +Waits for an event object until it is in the signaled state. Typically, if the event has been signalled after the os_event_reset() we'll return immediately because event->is_set == TRUE. @@ -359,53 +584,36 @@ os_event_wait_low( os_event_reset(). */ { #ifdef __WIN__ - DWORD err; - - ut_a(event); + if(!srv_use_native_conditions) { + DWORD err; - UT_NOT_USED(reset_sig_count); + ut_a(event); - /* Specify an infinite time limit for waiting */ - err = WaitForSingleObject(event->handle, INFINITE); + UT_NOT_USED(reset_sig_count); - ut_a(err == WAIT_OBJECT_0); + /* Specify an infinite wait */ + err = WaitForSingleObject(event->handle, INFINITE); - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { - os_thread_exit(NULL); + ut_a(err == WAIT_OBJECT_0); + return; } -#else - ib_int64_t old_signal_count; +#endif - os_fast_mutex_lock(&(event->os_mutex)); + os_fast_mutex_lock(&event->os_mutex); - if (reset_sig_count) { - old_signal_count = reset_sig_count; - } else { - old_signal_count = event->signal_count; + if (!reset_sig_count) { + reset_sig_count = event->signal_count; } - for (;;) { - if (event->is_set == TRUE - || event->signal_count != old_signal_count) { - - os_fast_mutex_unlock(&(event->os_mutex)); - - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { - - os_thread_exit(NULL); - } - /* Ok, we may return */ - - return; - } - - pthread_cond_wait(&(event->cond_var), &(event->os_mutex)); + while (!event->is_set && event->signal_count == reset_sig_count) { + os_cond_wait(&(event->cond_var), &(event->os_mutex)); /* Solaris manual said that spurious wakeups may occur: we have to check if the event really has been signaled after we came here to wait */ } -#endif + + os_fast_mutex_unlock(&event->os_mutex); } /**********************************************************//** @@ -414,112 +622,112 @@ a timeout is exceeded. @return 0 if success, OS_SYNC_TIME_EXCEEDED if timeout was exceeded */ UNIV_INTERN ulint -os_event_wait_time( -/*===============*/ - os_event_t event, /*!< in: event to wait */ - ulint wtime) /*!< in: timeout in microseconds, or - OS_SYNC_INFINITE_TIME */ +os_event_wait_time_low( +/*===================*/ + os_event_t event, /*!< in: event to wait */ + ulint time_in_usec, /*!< in: timeout in + microseconds, or + OS_SYNC_INFINITE_TIME */ + ib_int64_t reset_sig_count) /*!< in: zero or the value + returned by previous call of + os_event_reset(). */ + { + ibool timed_out = FALSE; + #ifdef __WIN__ - DWORD err; + DWORD time_in_ms; - ut_a(event); + if (!srv_use_native_conditions) { + DWORD err; - if (wtime != OS_SYNC_INFINITE_TIME) { - err = WaitForSingleObject(event->handle, (DWORD) wtime / 1000); - } else { - err = WaitForSingleObject(event->handle, INFINITE); - } + ut_a(event); - if (err == WAIT_OBJECT_0) { + if (time_in_usec != OS_SYNC_INFINITE_TIME) { + time_in_ms = time_in_usec / 1000; + err = WaitForSingleObject(event->handle, time_in_ms); + } else { + err = WaitForSingleObject(event->handle, INFINITE); + } - return(0); - } else if (err == WAIT_TIMEOUT) { + if (err == WAIT_OBJECT_0) { + return(0); + } else if ((err == WAIT_TIMEOUT) || (err == ERROR_TIMEOUT)) { + return(OS_SYNC_TIME_EXCEEDED); + } - return(OS_SYNC_TIME_EXCEEDED); - } else { ut_error; - return(1000000); /* dummy value to eliminate compiler warn. */ + /* Dummy value to eliminate compiler warning. */ + return(42); + } else { + ut_a(sleep_condition_variable != NULL); + + if (time_in_usec != OS_SYNC_INFINITE_TIME) { + time_in_ms = time_in_usec / 1000; + } else { + time_in_ms = INFINITE; + } } #else - int err; - int ret = 0; - ulint tmp; - ib_int64_t old_count; - struct timeval tv_start; - struct timespec timeout; - - if (wtime == OS_SYNC_INFINITE_TIME) { - os_event_wait(event); - return 0; - } + struct timespec abstime; - /* Compute the absolute point in time at which to time out. */ - gettimeofday(&tv_start, NULL); - tmp = tv_start.tv_usec + wtime; - timeout.tv_sec = tv_start.tv_sec + (tmp / 1000000); - timeout.tv_nsec = (tmp % 1000000) * 1000; + if (time_in_usec != OS_SYNC_INFINITE_TIME) { + struct timeval tv; + int ret; + ulint sec; + ulint usec; - os_fast_mutex_lock(&(event->os_mutex)); - old_count = event->signal_count; + ret = ut_usectime(&sec, &usec); + ut_a(ret == 0); - for (;;) { - if (event->is_set == TRUE || event->signal_count != old_count) - break; + tv.tv_sec = sec; + tv.tv_usec = usec; - err = pthread_cond_timedwait(&(event->cond_var), - &(event->os_mutex), &timeout); - if (err == ETIMEDOUT) { - ret = OS_SYNC_TIME_EXCEEDED; - break; + tv.tv_usec += time_in_usec; + + if ((ulint) tv.tv_usec >= MICROSECS_IN_A_SECOND) { + tv.tv_sec += time_in_usec / MICROSECS_IN_A_SECOND; + tv.tv_usec %= MICROSECS_IN_A_SECOND; } + + abstime.tv_sec = tv.tv_sec; + abstime.tv_nsec = tv.tv_usec * 1000; + } else { + abstime.tv_nsec = 999999999; + abstime.tv_sec = (time_t) ULINT_MAX; } - os_fast_mutex_unlock(&(event->os_mutex)); + ut_a(abstime.tv_nsec <= 999999999); - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { +#endif /* __WIN__ */ + + os_fast_mutex_lock(&event->os_mutex); - os_thread_exit(NULL); + if (!reset_sig_count) { + reset_sig_count = event->signal_count; } - return ret; -#endif -} + do { + if (event->is_set || event->signal_count != reset_sig_count) { -#ifdef __WIN__ -/**********************************************************//** -Waits for any event in an OS native event array. Returns if even a single -one is signaled or becomes signaled. -@return index of the event which was signaled */ -UNIV_INTERN -ulint -os_event_wait_multiple( -/*===================*/ - ulint n, /*!< in: number of events in the - array */ - os_native_event_t* native_event_array) - /*!< in: pointer to an array of event - handles */ -{ - DWORD index; + break; + } - ut_a(native_event_array); - ut_a(n > 0); + timed_out = os_cond_wait_timed( + &event->cond_var, &event->os_mutex, +#ifndef __WIN__ + &abstime +#else + time_in_ms +#endif /* !__WIN__ */ + ); - index = WaitForMultipleObjects((DWORD) n, native_event_array, - FALSE, /* Wait for any 1 event */ - INFINITE); /* Infinite wait time - limit */ - ut_a(index >= WAIT_OBJECT_0); /* NOTE: Pointless comparison */ - ut_a(index < WAIT_OBJECT_0 + n); + } while (!timed_out); - if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) { - os_thread_exit(NULL); - } + os_fast_mutex_unlock(&event->os_mutex); - return(index - WAIT_OBJECT_0); + return(timed_out ? OS_SYNC_TIME_EXCEEDED : 0); } -#endif /*********************************************************//** Creates an operating system mutex semaphore. Because these are slow, the @@ -527,29 +735,15 @@ mutex semaphore of InnoDB itself (mutex_t) should be used where possible. @return the mutex handle */ UNIV_INTERN os_mutex_t -os_mutex_create( -/*============*/ - const char* name) /*!< in: the name of the mutex, if NULL - the mutex is created without a name */ +os_mutex_create(void) +/*=================*/ { -#ifdef __WIN__ - HANDLE mutex; - os_mutex_t mutex_str; - - mutex = CreateMutex(NULL, /* No security attributes */ - FALSE, /* Initial state: no owner */ - (LPCTSTR) name); - ut_a(mutex); -#else os_fast_mutex_t* mutex; os_mutex_t mutex_str; - UT_NOT_USED(name); - mutex = ut_malloc(sizeof(os_fast_mutex_t)); os_fast_mutex_init(mutex); -#endif mutex_str = ut_malloc(sizeof(os_mutex_str_t)); mutex_str->handle = mutex; @@ -580,25 +774,11 @@ os_mutex_enter( /*===========*/ os_mutex_t mutex) /*!< in: mutex to acquire */ { -#ifdef __WIN__ - DWORD err; - - ut_a(mutex); - - /* Specify infinite time limit for waiting */ - err = WaitForSingleObject(mutex->handle, INFINITE); - - ut_a(err == WAIT_OBJECT_0); - - (mutex->count)++; - ut_a(mutex->count == 1); -#else os_fast_mutex_lock(mutex->handle); (mutex->count)++; ut_a(mutex->count == 1); -#endif } /**********************************************************//** @@ -614,11 +794,7 @@ os_mutex_exit( ut_a(mutex->count == 1); (mutex->count)--; -#ifdef __WIN__ - ut_a(ReleaseMutex(mutex->handle)); -#else os_fast_mutex_unlock(mutex->handle); -#endif } /**********************************************************//** @@ -647,15 +823,9 @@ os_mutex_free( os_mutex_exit(os_sync_mutex); } -#ifdef __WIN__ - ut_a(CloseHandle(mutex->handle)); - - ut_free(mutex); -#else os_fast_mutex_free(mutex->handle); ut_free(mutex->handle); ut_free(mutex); -#endif } /*********************************************************//** diff --git a/storage/xtradb/os/os0thread.c b/storage/xtradb/os/os0thread.c index e41c1163371..12b6805d98e 100644 --- a/storage/xtradb/os/os0thread.c +++ b/storage/xtradb/os/os0thread.c @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1995, 2009, Innobase Oy. All Rights Reserved. +Copyright (c) 1995, 2010, Innobase Oy. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -133,15 +133,6 @@ os_thread_create( 0, /* thread runs immediately */ &win_thread_id); - if (srv_set_thread_priorities) { - - /* Set created thread priority the same as a normal query - in MYSQL: we try to prevent starvation of threads by - assigning same priority QUERY_PRIOR to all */ - - ut_a(SetThreadPriority(thread, srv_query_thread_priority)); - } - if (thread_id) { *thread_id = win_thread_id; } @@ -172,16 +163,6 @@ os_thread_create( exit(1); } #endif -#ifdef __NETWARE__ - ret = pthread_attr_setstacksize(&attr, - (size_t) NW_THD_STACKSIZE); - if (ret) { - fprintf(stderr, - "InnoDB: Error: pthread_attr_setstacksize" - " returned %d\n", ret); - exit(1); - } -#endif os_mutex_enter(os_sync_mutex); os_thread_count++; os_mutex_exit(os_sync_mutex); @@ -200,7 +181,6 @@ os_thread_create( #ifndef UNIV_HPUX10 pthread_attr_destroy(&attr); #endif - if (thread_id) { *thread_id = pthread; } @@ -222,6 +202,11 @@ os_thread_exit( fprintf(stderr, "Thread exits, id %lu\n", os_thread_pf(os_thread_get_curr_id())); #endif + +#ifdef UNIV_PFS_THREAD + pfs_delete_thread(); +#endif + os_mutex_enter(os_sync_mutex); os_thread_count--; os_mutex_exit(os_sync_mutex); @@ -235,21 +220,6 @@ os_thread_exit( } /*****************************************************************//** -Returns handle to the current thread. -@return current thread handle */ -UNIV_INTERN -os_thread_t -os_thread_get_curr(void) -/*====================*/ -{ -#ifdef __WIN__ - return(GetCurrentThread()); -#else - return(pthread_self()); -#endif -} - -/*****************************************************************//** Advises the os to give up remainder of the thread's time slice. */ UNIV_INTERN void @@ -257,7 +227,7 @@ os_thread_yield(void) /*=================*/ { #if defined(__WIN__) - Sleep(0); + SwitchToThread(); #elif (defined(HAVE_SCHED_YIELD) && defined(HAVE_SCHED_H)) sched_yield(); #elif defined(HAVE_PTHREAD_YIELD_ZERO_ARG) @@ -280,8 +250,6 @@ os_thread_sleep( { #ifdef __WIN__ Sleep((DWORD) tm / 1000); -#elif defined(__NETWARE__) - delay(tm / 1000); #else struct timeval t; @@ -291,81 +259,3 @@ os_thread_sleep( select(0, NULL, NULL, NULL, &t); #endif } - -#ifndef UNIV_HOTBACKUP -/******************************************************************//** -Sets a thread priority. */ -UNIV_INTERN -void -os_thread_set_priority( -/*===================*/ - os_thread_t handle, /*!< in: OS handle to the thread */ - ulint pri) /*!< in: priority */ -{ -#ifdef __WIN__ - int os_pri; - - if (pri == OS_THREAD_PRIORITY_BACKGROUND) { - os_pri = THREAD_PRIORITY_BELOW_NORMAL; - } else if (pri == OS_THREAD_PRIORITY_NORMAL) { - os_pri = THREAD_PRIORITY_NORMAL; - } else if (pri == OS_THREAD_PRIORITY_ABOVE_NORMAL) { - os_pri = THREAD_PRIORITY_HIGHEST; - } else { - ut_error; - } - - ut_a(SetThreadPriority(handle, os_pri)); -#else - UT_NOT_USED(handle); - UT_NOT_USED(pri); -#endif -} - -/******************************************************************//** -Gets a thread priority. -@return priority */ -UNIV_INTERN -ulint -os_thread_get_priority( -/*===================*/ - os_thread_t handle __attribute__((unused))) - /*!< in: OS handle to the thread */ -{ -#ifdef __WIN__ - int os_pri; - ulint pri; - - os_pri = GetThreadPriority(handle); - - if (os_pri == THREAD_PRIORITY_BELOW_NORMAL) { - pri = OS_THREAD_PRIORITY_BACKGROUND; - } else if (os_pri == THREAD_PRIORITY_NORMAL) { - pri = OS_THREAD_PRIORITY_NORMAL; - } else if (os_pri == THREAD_PRIORITY_HIGHEST) { - pri = OS_THREAD_PRIORITY_ABOVE_NORMAL; - } else { - ut_error; - } - - return(pri); -#else - return(0); -#endif -} - -/******************************************************************//** -Gets the last operating system error code for the calling thread. -@return last error on Windows, 0 otherwise */ -UNIV_INTERN -ulint -os_thread_get_last_error(void) -/*==========================*/ -{ -#ifdef __WIN__ - return(GetLastError()); -#else - return(0); -#endif -} -#endif /* !UNIV_HOTBACKUP */ |