summaryrefslogtreecommitdiff
path: root/storage/xtradb/os
diff options
context:
space:
mode:
Diffstat (limited to 'storage/xtradb/os')
-rw-r--r--storage/xtradb/os/os0file.c1458
-rw-r--r--storage/xtradb/os/os0proc.c174
-rw-r--r--storage/xtradb/os/os0sync.c602
-rw-r--r--storage/xtradb/os/os0thread.c124
4 files changed, 1526 insertions, 832 deletions
diff --git a/storage/xtradb/os/os0file.c b/storage/xtradb/os/os0file.c
index 48d796c38e1..835210140f8 100644
--- a/storage/xtradb/os/os0file.c
+++ b/storage/xtradb/os/os0file.c
@@ -33,6 +33,11 @@ Created 10/21/1995 Heikki Tuuri
*******************************************************/
#include "os0file.h"
+
+#ifdef UNIV_NONINL
+#include "os0file.ic"
+#endif
+
#include "ut0mem.h"
#include "srv0srv.h"
#include "srv0start.h"
@@ -53,6 +58,10 @@ Created 10/21/1995 Heikki Tuuri
# endif /* __WIN__ */
#endif /* !UNIV_HOTBACKUP */
+#if defined(LINUX_NATIVE_AIO)
+#include <libaio.h>
+#endif
+
/* This specifies the file permissions InnoDB uses when it creates files in
Unix; the value of os_innodb_umask is initialized in ha_innodb.cc to
my_umask */
@@ -74,9 +83,7 @@ UNIV_INTERN ibool os_do_not_call_flush_at_each_write = FALSE;
/* We do not call os_file_flush in every os_file_write. */
#endif /* UNIV_DO_FLUSH */
-#ifdef UNIV_HOTBACKUP
-# define os_aio_use_native_aio FALSE
-#else /* UNIV_HOTBACKUP */
+#ifndef UNIV_HOTBACKUP
/* We use these mutexes to protect lseek + file i/o operation, if the
OS does not provide an atomic pread or pwrite, or similar */
#define OS_FILE_N_SEEK_MUTEXES 16
@@ -85,36 +92,69 @@ UNIV_INTERN os_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES];
/* In simulated aio, merge at most this many consecutive i/os */
#define OS_AIO_MERGE_N_CONSECUTIVE 64
-/** If this flag is TRUE, then we will use the native aio of the
-OS (provided we compiled Innobase with it in), otherwise we will
-use simulated aio we build below with threads */
-
-UNIV_INTERN ibool os_aio_use_native_aio = FALSE;
+/**********************************************************************
+
+InnoDB AIO Implementation:
+=========================
+
+We support native AIO for windows and linux. For rest of the platforms
+we simulate AIO by special io-threads servicing the IO-requests.
+
+Simulated AIO:
+==============
+
+In platforms where we 'simulate' AIO following is a rough explanation
+of the high level design.
+There are four io-threads (for ibuf, log, read, write).
+All synchronous IO requests are serviced by the calling thread using
+os_file_write/os_file_read. The Asynchronous requests are queued up
+in an array (there are four such arrays) by the calling thread.
+Later these requests are picked up by the io-thread and are serviced
+synchronously.
+
+Windows native AIO:
+==================
+
+If srv_use_native_aio is not set then windows follow the same
+code as simulated AIO. If the flag is set then native AIO interface
+is used. On windows, one of the limitation is that if a file is opened
+for AIO no synchronous IO can be done on it. Therefore we have an
+extra fifth array to queue up synchronous IO requests.
+There are innodb_file_io_threads helper threads. These threads work
+on the four arrays mentioned above in Simulated AIO. No thread is
+required for the sync array.
+If a synchronous IO request is made, it is first queued in the sync
+array. Then the calling thread itself waits on the request, thus
+making the call synchronous.
+If an AIO request is made the calling thread not only queues it in the
+array but also submits the requests. The helper thread then collects
+the completed IO request and calls completion routine on it.
+
+Linux native AIO:
+=================
+
+If we have libaio installed on the system and innodb_use_native_aio
+is set to TRUE we follow the code path of native AIO, otherwise we
+do simulated AIO.
+There are innodb_file_io_threads helper threads. These threads work
+on the four arrays mentioned above in Simulated AIO.
+If a synchronous IO request is made, it is handled by calling
+os_file_write/os_file_read.
+If an AIO request is made the calling thread not only queues it in the
+array but also submits the requests. The helper thread then collects
+the completed IO request and calls completion routine on it.
+
+**********************************************************************/
/** Flag: enable debug printout for asynchronous i/o */
UNIV_INTERN ibool os_aio_print_debug = FALSE;
-/* State for the state of an IO request in simulated AIO.
- Protocol for simulated aio:
- client requests IO: find slot with reserved = FALSE. Add entry with
- status = OS_AIO_NOT_ISSUED.
- IO thread wakes: find adjacent slots with reserved = TRUE and status =
- OS_AIO_NOT_ISSUED. Change status for slots to
- OS_AIO_ISSUED.
- IO operation completes: set status for slots to OS_AIO_DONE. set status
- for the first slot to OS_AIO_CLAIMED and return
- result for that slot.
- When there are multiple read and write threads, they all compete to execute
- the requests in the array (os_aio_array_t). This avoids the need to load
- balance requests at the time the request is made at the cost of waking all
- threads when a request is available.
-*/
-typedef enum {
- OS_AIO_NOT_ISSUED, /* Available to be processed by an IO thread. */
- OS_AIO_ISSUED, /* Being processed by an IO thread. */
- OS_AIO_DONE, /* Request processed. */
- OS_AIO_CLAIMED /* Result being returned to client. */
-} os_aio_status;
+#ifdef UNIV_PFS_IO
+/* Keys to register InnoDB I/O with performance schema */
+UNIV_INTERN mysql_pfs_key_t innodb_file_data_key;
+UNIV_INTERN mysql_pfs_key_t innodb_file_log_key;
+UNIV_INTERN mysql_pfs_key_t innodb_file_temp_key;
+#endif /* UNIV_PFS_IO */
/** The asynchronous i/o array slot structure */
typedef struct os_aio_slot_struct os_aio_slot_t;
@@ -125,8 +165,6 @@ struct os_aio_slot_struct{
ulint pos; /*!< index of the slot in the aio
array */
ibool reserved; /*!< TRUE if this slot is reserved */
- os_aio_status status; /* Status for current request. Valid when reserved
- is TRUE. Used only in simulated aio. */
time_t reservation_time;/*!< time when reserved */
ulint len; /*!< length of the block to read or
write */
@@ -137,21 +175,26 @@ struct os_aio_slot_struct{
ulint offset_high; /*!< 32 high bits of file offset */
os_file_t file; /*!< file where to read or write */
const char* name; /*!< file name or path */
-// ibool io_already_done;/*!< used only in simulated aio:
-// TRUE if the physical i/o already
-// made and only the slot message
-// needs to be passed to the caller
-// of os_aio_simulated_handle */
+ ibool io_already_done;/*!< used only in simulated aio:
+ TRUE if the physical i/o already
+ made and only the slot message
+ needs to be passed to the caller
+ of os_aio_simulated_handle */
+ ulint space_id;
fil_node_t* message1; /*!< message which is given by the */
void* message2; /*!< the requester of an aio operation
and which can be used to identify
which pending aio operation was
completed */
#ifdef WIN_ASYNC_IO
- os_event_t event; /*!< event object we need in the
+ HANDLE handle; /*!< handle object we need in the
OVERLAPPED struct */
OVERLAPPED control; /*!< Windows control block for the
aio request */
+#elif defined(LINUX_NATIVE_AIO)
+ struct iocb control; /* Linux control block for aio */
+ int n_bytes; /* bytes written/read. */
+ int ret; /* AIO return code */
#endif
};
@@ -177,12 +220,16 @@ struct os_aio_array_struct{
array of pending aio requests. A
thread can wait separately for any one
of the segments. */
+ ulint cur_seg;/*!< We reserve IO requests in round
+ robin fashion to different segments.
+ This points to the segment that is to
+ be used to service next IO request. */
ulint n_reserved;
/*!< Number of reserved slots in the
aio array outside the ibuf segment */
os_aio_slot_t* slots; /*!< Pointer to the slots in the array */
#ifdef __WIN__
- os_native_event_t* native_events;
+ HANDLE* handles;
/*!< Pointer to an array of OS native
event handles where we copied the
handles from slots, in the same
@@ -190,17 +237,33 @@ struct os_aio_array_struct{
WaitForMultipleObjects; used only in
Windows */
#endif
+
+#if defined(LINUX_NATIVE_AIO)
+ io_context_t* aio_ctx;
+ /* completion queue for IO. There is
+ one such queue per segment. Each thread
+ will work on one ctx exclusively. */
+ struct io_event* aio_events;
+ /* The array to collect completed IOs.
+ There is one such event for each
+ possible pending IO. The size of the
+ array is equal to n_slots. */
+#endif
};
-/** Array of events used in simulated aio */
-static os_event_t* os_aio_segment_wait_events = NULL;
+#if defined(LINUX_NATIVE_AIO)
+/** timeout for each io_getevents() call = 500ms. */
+#define OS_AIO_REAP_TIMEOUT (500000000UL)
+
+/** time to sleep, in microseconds if io_setup() returns EAGAIN. */
+#define OS_AIO_IO_SETUP_RETRY_SLEEP (500000UL)
-/* Number for the first global segment for reading. */
-const ulint os_aio_first_read_segment = 2;
+/** number of attempts before giving up on io_setup(). */
+#define OS_AIO_IO_SETUP_RETRY_ATTEMPTS 5
+#endif
-/* Number for the first global segment for writing. Set to
-2 + os_aio_read_write_threads. */
-ulint os_aio_first_write_segment = 0;
+/** Array of events used in simulated aio */
+static os_event_t* os_aio_segment_wait_events = NULL;
/** The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These
are NULL when the module has not yet been initialized. @{ */
@@ -211,19 +274,13 @@ static os_aio_array_t* os_aio_log_array = NULL; /*!< Redo log */
static os_aio_array_t* os_aio_sync_array = NULL; /*!< Synchronous I/O */
/* @} */
-/* Per thread buffer used for merged IO requests. Used by
-os_aio_simulated_handle so that a buffer doesn't have to be allocated
-for each request. */
-static byte* os_aio_thread_buffer[SRV_MAX_N_IO_THREADS];
-static ulint os_aio_thread_buffer_size[SRV_MAX_N_IO_THREADS];
-
/** Number of asynchronous I/O segments. Set by os_aio_init(). */
static ulint os_aio_n_segments = ULINT_UNDEFINED;
/** If the following is TRUE, read i/o handler threads try to
wait until a batch of new read requests have been posted */
-static volatile ibool os_aio_recommend_sleep_for_read_threads = FALSE;
-#endif /* UNIV_HOTBACKUP */
+static ibool os_aio_recommend_sleep_for_read_threads = FALSE;
+#endif /* !UNIV_HOTBACKUP */
UNIV_INTERN ulint os_n_file_reads = 0;
UNIV_INTERN ulint os_bytes_read_since_printout = 0;
@@ -249,15 +306,45 @@ UNIV_INTERN ulint os_n_pending_writes = 0;
/** Number of pending read operations */
UNIV_INTERN ulint os_n_pending_reads = 0;
+#ifdef UNIV_DEBUG
+/**********************************************************************//**
+Validates the consistency the aio system some of the time.
+@return TRUE if ok or the check was skipped */
+UNIV_INTERN
+ibool
+os_aio_validate_skip(void)
+/*======================*/
+{
+/** Try os_aio_validate() every this many times */
+# define OS_AIO_VALIDATE_SKIP 13
+
+ /** The os_aio_validate() call skip counter.
+ Use a signed type because of the race condition below. */
+ static int os_aio_validate_count = OS_AIO_VALIDATE_SKIP;
+
+ /* There is a race condition below, but it does not matter,
+ because this call is only for heuristic purposes. We want to
+ reduce the call frequency of the costly os_aio_validate()
+ check in debug builds. */
+ if (--os_aio_validate_count > 0) {
+ return(TRUE);
+ }
+
+ os_aio_validate_count = OS_AIO_VALIDATE_SKIP;
+ return(os_aio_validate());
+}
+#endif /* UNIV_DEBUG */
+
+#ifdef __WIN__
/***********************************************************************//**
Gets the operating system version. Currently works only on Windows.
-@return OS_WIN95, OS_WIN31, OS_WINNT, OS_WIN2000 */
+@return OS_WIN95, OS_WIN31, OS_WINNT, OS_WIN2000, OS_WINXP, OS_WINVISTA,
+OS_WIN7. */
UNIV_INTERN
ulint
os_get_os_version(void)
/*===================*/
{
-#ifdef __WIN__
OSVERSIONINFO os_info;
os_info.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
@@ -269,21 +356,25 @@ os_get_os_version(void)
} else if (os_info.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS) {
return(OS_WIN95);
} else if (os_info.dwPlatformId == VER_PLATFORM_WIN32_NT) {
- if (os_info.dwMajorVersion <= 4) {
- return(OS_WINNT);
- } else {
- return(OS_WIN2000);
+ switch (os_info.dwMajorVersion) {
+ case 3:
+ case 4:
+ return OS_WINNT;
+ case 5:
+ return (os_info.dwMinorVersion == 0) ? OS_WIN2000
+ : OS_WINXP;
+ case 6:
+ return (os_info.dwMinorVersion == 0) ? OS_WINVISTA
+ : OS_WIN7;
+ default:
+ return OS_WIN7;
}
} else {
ut_error;
return(0);
}
-#else
- ut_error;
-
- return(0);
-#endif
}
+#endif /* __WIN__ */
/***********************************************************************//**
Retrieves the last error number if an error occurs in a file io function.
@@ -429,17 +520,29 @@ os_file_get_last_error(
fflush(stderr);
- if (err == ENOSPC) {
+ switch (err) {
+ case ENOSPC:
return(OS_FILE_DISK_FULL);
- } else if (err == ENOENT) {
+ case ENOENT:
return(OS_FILE_NOT_FOUND);
- } else if (err == EEXIST) {
+ case EEXIST:
return(OS_FILE_ALREADY_EXISTS);
- } else if (err == EXDEV || err == ENOTDIR || err == EISDIR) {
+ case EXDEV:
+ case ENOTDIR:
+ case EISDIR:
return(OS_FILE_PATH_ERROR);
- } else {
- return(100 + err);
+ case EAGAIN:
+ if (srv_use_native_aio) {
+ return(OS_FILE_AIO_RESOURCES_RESERVED);
+ }
+ break;
+ case EINTR:
+ if (srv_use_native_aio) {
+ return(OS_FILE_AIO_INTERRUPTED);
+ }
+ break;
}
+ return(100 + err);
#endif
}
@@ -489,6 +592,9 @@ os_file_handle_error_cond_exit(
} else if (err == OS_FILE_AIO_RESOURCES_RESERVED) {
return(TRUE);
+ } else if (err == OS_FILE_AIO_INTERRUPTED) {
+
+ return(TRUE);
} else if (err == OS_FILE_ALREADY_EXISTS
|| err == OS_FILE_PATH_ERROR) {
@@ -555,7 +661,7 @@ os_file_handle_error_no_exit(
#undef USE_FILE_LOCK
#define USE_FILE_LOCK
-#if defined(UNIV_HOTBACKUP) || defined(__WIN__) || defined(__NETWARE__)
+#if defined(UNIV_HOTBACKUP) || defined(__WIN__)
/* InnoDB Hot Backup does not lock the data files.
* On Windows, mandatory locking is used.
*/
@@ -605,45 +711,37 @@ os_io_init_simple(void)
{
ulint i;
- os_file_count_mutex = os_mutex_create(NULL);
+ os_file_count_mutex = os_mutex_create();
for (i = 0; i < OS_FILE_N_SEEK_MUTEXES; i++) {
- os_file_seek_mutexes[i] = os_mutex_create(NULL);
+ os_file_seek_mutexes[i] = os_mutex_create();
}
}
/***********************************************************************//**
Creates a temporary file. This function is like tmpfile(3), but
the temporary file is created in the MySQL temporary directory.
-On Netware, this function is like tmpfile(3), because the C run-time
-library of Netware does not expose the delete-on-close flag.
@return temporary file handle, or NULL on error */
UNIV_INTERN
FILE*
os_file_create_tmpfile(void)
/*========================*/
{
-#ifdef __NETWARE__
- FILE* file = tmpfile();
-#else /* __NETWARE__ */
FILE* file = NULL;
int fd = innobase_mysql_tmpfile();
if (fd >= 0) {
file = fdopen(fd, "w+b");
}
-#endif /* __NETWARE__ */
if (!file) {
ut_print_timestamp(stderr);
fprintf(stderr,
" InnoDB: Error: unable to create temporary file;"
" errno: %d\n", errno);
-#ifndef __NETWARE__
if (fd >= 0) {
close(fd);
}
-#endif /* !__NETWARE__ */
}
return(file);
@@ -964,13 +1062,15 @@ os_file_create_directory(
}
/****************************************************************//**
+NOTE! Use the corresponding macro os_file_create_simple(), not directly
+this function!
A simple function to open or create a file.
@return own: handle to the file, not defined if error, error number
can be retrieved with os_file_get_last_error */
UNIV_INTERN
os_file_t
-os_file_create_simple(
-/*==================*/
+os_file_create_simple_func(
+/*=======================*/
const char* name, /*!< in: name of the file or path as a
null-terminated string */
ulint create_mode,/*!< in: OS_FILE_OPEN if an existing file is
@@ -1105,13 +1205,15 @@ try_again:
}
/****************************************************************//**
+NOTE! Use the corresponding macro
+os_file_create_simple_no_error_handling(), not directly this function!
A simple function to open or create a file.
@return own: handle to the file, not defined if error, error number
can be retrieved with os_file_get_last_error */
UNIV_INTERN
os_file_t
-os_file_create_simple_no_error_handling(
-/*====================================*/
+os_file_create_simple_no_error_handling_func(
+/*=========================================*/
const char* name, /*!< in: name of the file or path as a
null-terminated string */
ulint create_mode,/*!< in: OS_FILE_OPEN if an existing file
@@ -1222,10 +1324,12 @@ UNIV_INTERN
void
os_file_set_nocache(
/*================*/
- int fd, /*!< in: file descriptor to alter */
- const char* file_name, /*!< in: file name, used in the
- diagnostic message */
- const char* operation_name) /*!< in: "open" or "create"; used in the
+ int fd /*!< in: file descriptor to alter */
+ __attribute__((unused)),
+ const char* file_name /*!< in: used in the diagnostic message */
+ __attribute__((unused)),
+ const char* operation_name __attribute__((unused)))
+ /*!< in: "open" or "create"; used in the
diagnostic message */
{
/* some versions of Solaris may not have DIRECTIO_ON */
@@ -1260,13 +1364,15 @@ os_file_set_nocache(
}
/****************************************************************//**
+NOTE! Use the corresponding macro os_file_create(), not directly
+this function!
Opens an existing file or creates a new.
@return own: handle to the file, not defined if error, error number
can be retrieved with os_file_get_last_error */
UNIV_INTERN
os_file_t
-os_file_create(
-/*===========*/
+os_file_create_func(
+/*================*/
const char* name, /*!< in: name of the file or path as a
null-terminated string */
ulint create_mode,/*!< in: OS_FILE_OPEN if an existing file
@@ -1316,13 +1422,13 @@ try_again:
buffering of writes in the OS */
attributes = 0;
#ifdef WIN_ASYNC_IO
- if (os_aio_use_native_aio) {
+ if (srv_use_native_aio) {
attributes = attributes | FILE_FLAG_OVERLAPPED;
}
#endif
#ifdef UNIV_NON_BUFFERED_IO
# ifndef UNIV_HOTBACKUP
- if (type == OS_LOG_FILE && srv_flush_log_at_trx_commit == 2) {
+ if (type == OS_LOG_FILE && thd_flush_log_at_trx_commit(NULL) == 2) {
/* Do not use unbuffered i/o to log files because
value 2 denotes that we do not flush the log at every
commit, but only once per second */
@@ -1338,7 +1444,7 @@ try_again:
attributes = 0;
#ifdef UNIV_NON_BUFFERED_IO
# ifndef UNIV_HOTBACKUP
- if (type == OS_LOG_FILE && srv_flush_log_at_trx_commit == 2) {
+ if (type == OS_LOG_FILE && thd_flush_log_at_trx_commit(NULL) == 2) {
/* Do not use unbuffered i/o to log files because
value 2 denotes that we do not flush the log at every
commit, but only once per second */
@@ -1407,8 +1513,6 @@ try_again:
int create_flag;
ibool retry;
const char* mode_str = NULL;
- const char* type_str = NULL;
- const char* purpose_str = NULL;
try_again:
ut_a(name);
@@ -1428,26 +1532,9 @@ try_again:
ut_error;
}
- if (type == OS_LOG_FILE) {
- type_str = "LOG";
- } else if (type == OS_DATA_FILE) {
- type_str = "DATA";
- } else {
- ut_error;
- }
+ ut_a(type == OS_LOG_FILE || type == OS_DATA_FILE);
+ ut_a(purpose == OS_FILE_AIO || purpose == OS_FILE_NORMAL);
- if (purpose == OS_FILE_AIO) {
- purpose_str = "AIO";
- } else if (purpose == OS_FILE_NORMAL) {
- purpose_str = "NORMAL";
- } else {
- ut_error;
- }
-
-#if 0
- fprintf(stderr, "Opening file %s, mode %s, type %s, purpose %s\n",
- name, mode_str, type_str, purpose_str);
-#endif
#ifdef O_SYNC
/* We let O_SYNC only affect log files; note that we map O_DSYNC to
O_SYNC because the datasync options seemed to corrupt files in 2001
@@ -1664,13 +1751,14 @@ loop:
}
/***********************************************************************//**
+NOTE! Use the corresponding macro os_file_rename(), not directly this function!
Renames a file (can also move it to another directory). It is safest that the
file is closed before calling this function.
@return TRUE if success */
UNIV_INTERN
ibool
-os_file_rename(
-/*===========*/
+os_file_rename_func(
+/*================*/
const char* oldpath,/*!< in: old file path as a null-terminated
string */
const char* newpath)/*!< in: new file path */
@@ -1703,13 +1791,14 @@ os_file_rename(
}
/***********************************************************************//**
+NOTE! Use the corresponding macro os_file_close(), not directly this function!
Closes a file handle. In case of error, error number can be retrieved with
os_file_get_last_error.
@return TRUE if success */
UNIV_INTERN
ibool
-os_file_close(
-/*==========*/
+os_file_close_func(
+/*===============*/
os_file_t file) /*!< in, own: handle to a file */
{
#ifdef __WIN__
@@ -1928,7 +2017,7 @@ os_file_set_size(
ut_free(buf2);
- ret = os_file_flush(file);
+ ret = os_file_flush(file, TRUE);
if (ret) {
return(TRUE);
@@ -1966,7 +2055,8 @@ static
int
os_file_fsync(
/*==========*/
- os_file_t file) /*!< in: handle to a file */
+ os_file_t file, /*!< in: handle to a file */
+ ibool metadata)
{
int ret;
int failures;
@@ -1975,7 +2065,16 @@ os_file_fsync(
failures = 0;
do {
+#if defined(HAVE_FDATASYNC) && HAVE_DECL_FDATASYNC
+ if (metadata) {
+ ret = fsync(file);
+ } else {
+ ret = fdatasync(file);
+ }
+#else
+ (void) metadata;
ret = fsync(file);
+#endif
os_n_fsyncs++;
@@ -1994,6 +2093,9 @@ os_file_fsync(
failures++;
retry = TRUE;
+ } else if (ret == -1 && errno == EINTR) {
+ /* Handle signal interruptions correctly */
+ retry = TRUE;
} else {
retry = FALSE;
@@ -2005,13 +2107,15 @@ os_file_fsync(
#endif /* !__WIN__ */
/***********************************************************************//**
+NOTE! Use the corresponding macro os_file_flush(), not directly this function!
Flushes the write buffers of a given file to the disk.
@return TRUE if success */
UNIV_INTERN
ibool
-os_file_flush(
-/*==========*/
- os_file_t file) /*!< in, own: handle to a file */
+os_file_flush_func(
+/*===============*/
+ os_file_t file, /*!< in, own: handle to a file */
+ ibool metadata)
{
#ifdef __WIN__
BOOL ret;
@@ -2061,18 +2165,18 @@ os_file_flush(
/* If we are not on an operating system that supports this,
then fall back to a plain fsync. */
- ret = os_file_fsync(file);
+ ret = os_file_fsync(file, metadata);
} else {
ret = fcntl(file, F_FULLFSYNC, NULL);
if (ret) {
/* If we are not on a file system that supports this,
then fall back to a plain fsync. */
- ret = os_file_fsync(file);
+ ret = os_file_fsync(file, metadata);
}
}
#else
- ret = os_file_fsync(file);
+ ret = os_file_fsync(file, metadata);
#endif
if (ret == 0) {
@@ -2106,12 +2210,9 @@ os_file_flush(
/*******************************************************************//**
Does a synchronous read operation in Posix.
@return number of bytes read, -1 if error */
-#define os_file_pread(file, buf, n, offset, offset_high) \
- _os_file_pread(file, buf, n, offset, offset_high, NULL);
-
static
ssize_t
-_os_file_pread(
+os_file_pread(
/*==========*/
os_file_t file, /*!< in: handle to a file */
void* buf, /*!< in: buffer where to read */
@@ -2125,6 +2226,7 @@ _os_file_pread(
off_t offs;
#if defined(HAVE_PREAD) && !defined(HAVE_BROKEN_PREAD)
ssize_t n_bytes;
+ ssize_t n_read;
#endif /* HAVE_PREAD && !HAVE_BROKEN_PREAD */
ulint sec;
ulint ms;
@@ -2165,7 +2267,18 @@ _os_file_pread(
os_n_pending_reads++;
os_mutex_exit(os_file_count_mutex);
- n_bytes = pread(file, buf, (ssize_t)n, offs);
+ /* Handle signal interruptions correctly */
+ for (n_bytes = 0; n_bytes < (ssize_t) n; ) {
+ n_read = pread(file, buf, (ssize_t)n, offs);
+ if (n_read > 0) {
+ n_bytes += n_read;
+ offs += n_read;
+ } else if (n_read == -1 && errno == EINTR) {
+ continue;
+ } else {
+ break;
+ }
+ }
os_mutex_enter(os_file_count_mutex);
os_file_n_pending_preads--;
@@ -2184,6 +2297,7 @@ _os_file_pread(
{
off_t ret_offset;
ssize_t ret;
+ ssize_t n_read;
#ifndef UNIV_HOTBACKUP
ulint i;
#endif /* !UNIV_HOTBACKUP */
@@ -2204,7 +2318,17 @@ _os_file_pread(
if (ret_offset < 0) {
ret = -1;
} else {
- ret = read(file, buf, (ssize_t)n);
+ /* Handle signal interruptions correctly */
+ for (ret = 0; ret < (ssize_t) n; ) {
+ n_read = read(file, buf, (ssize_t)n);
+ if (n_read > 0) {
+ ret += n_read;
+ } else if (n_read == -1 && errno == EINTR) {
+ continue;
+ } else {
+ break;
+ }
+ }
}
#ifndef UNIV_HOTBACKUP
@@ -2243,6 +2367,7 @@ os_file_pwrite(
offset */
{
ssize_t ret;
+ ssize_t n_written;
off_t offs;
ut_a((offset & 0xFFFFFFFFUL) == offset);
@@ -2270,7 +2395,18 @@ os_file_pwrite(
os_n_pending_writes++;
os_mutex_exit(os_file_count_mutex);
- ret = pwrite(file, buf, (ssize_t)n, offs);
+ /* Handle signal interruptions correctly */
+ for (ret = 0; ret < (ssize_t) n; ) {
+ n_written = pwrite(file, buf, (ssize_t)n, offs);
+ if (n_written > 0) {
+ ret += n_written;
+ offs += n_written;
+ } else if (n_written == -1 && errno == EINTR) {
+ continue;
+ } else {
+ break;
+ }
+ }
os_mutex_enter(os_file_count_mutex);
os_file_n_pending_pwrites--;
@@ -2286,7 +2422,7 @@ os_file_pwrite(
the OS crashes, a database page is only partially
physically written to disk. */
- ut_a(TRUE == os_file_flush(file));
+ ut_a(TRUE == os_file_flush(file, TRUE));
}
# endif /* UNIV_DO_FLUSH */
@@ -2317,7 +2453,17 @@ os_file_pwrite(
goto func_exit;
}
- ret = write(file, buf, (ssize_t)n);
+ /* Handle signal interruptions correctly */
+ for (ret = 0; ret < (ssize_t) n; ) {
+ n_written = write(file, buf, (ssize_t)n);
+ if (n_written > 0) {
+ ret += n_written;
+ } else if (n_written == -1 && errno == EINTR) {
+ continue;
+ } else {
+ break;
+ }
+ }
# ifdef UNIV_DO_FLUSH
if (srv_unix_file_flush_method != SRV_UNIX_LITTLESYNC
@@ -2328,7 +2474,7 @@ os_file_pwrite(
the OS crashes, a database page is only partially
physically written to disk. */
- ut_a(TRUE == os_file_flush(file));
+ ut_a(TRUE == os_file_flush(file, TRUE));
}
# endif /* UNIV_DO_FLUSH */
@@ -2348,12 +2494,14 @@ func_exit:
#endif
/*******************************************************************//**
+NOTE! Use the corresponding macro os_file_read(), not directly this
+function!
Requests a synchronous positioned read operation.
@return TRUE if request was successful, FALSE if fail */
UNIV_INTERN
ibool
-_os_file_read(
-/*=========*/
+os_file_read_func(
+/*==============*/
os_file_t file, /*!< in: handle to a file */
void* buf, /*!< in: buffer where to read */
ulint offset, /*!< in: least significant 32 bits of file
@@ -2374,7 +2522,10 @@ _os_file_read(
ulint i;
#endif /* !UNIV_HOTBACKUP */
+ /* On 64-bit Windows, ulint is 64 bits. But offset and n should be
+ no more than 32 bits. */
ut_a((offset & 0xFFFFFFFFUL) == offset);
+ ut_a((n & 0xFFFFFFFFUL) == n);
os_n_file_reads++;
os_bytes_read_since_printout += n;
@@ -2433,7 +2584,7 @@ try_again:
os_bytes_read_since_printout += n;
try_again:
- ret = _os_file_pread(file, buf, n, offset, offset_high, trx);
+ ret = os_file_pread(file, buf, n, offset, offset_high, trx);
if ((ulint)ret == n) {
@@ -2472,13 +2623,15 @@ error_handling:
}
/*******************************************************************//**
+NOTE! Use the corresponding macro os_file_read_no_error_handling(),
+not directly this function!
Requests a synchronous positioned read operation. This function does not do
any error handling. In case of error it returns FALSE.
@return TRUE if request was successful, FALSE if fail */
UNIV_INTERN
ibool
-os_file_read_no_error_handling(
-/*===========================*/
+os_file_read_no_error_handling_func(
+/*================================*/
os_file_t file, /*!< in: handle to a file */
void* buf, /*!< in: buffer where to read */
ulint offset, /*!< in: least significant 32 bits of file
@@ -2498,7 +2651,10 @@ os_file_read_no_error_handling(
ulint i;
#endif /* !UNIV_HOTBACKUP */
+ /* On 64-bit Windows, ulint is 64 bits. But offset and n should be
+ no more than 32 bits. */
ut_a((offset & 0xFFFFFFFFUL) == offset);
+ ut_a((n & 0xFFFFFFFFUL) == n);
os_n_file_reads++;
os_bytes_read_since_printout += n;
@@ -2557,7 +2713,7 @@ try_again:
os_bytes_read_since_printout += n;
try_again:
- ret = os_file_pread(file, buf, n, offset, offset_high);
+ ret = os_file_pread(file, buf, n, offset, offset_high, NULL);
if ((ulint)ret == n) {
@@ -2600,12 +2756,14 @@ os_file_read_string(
}
/*******************************************************************//**
+NOTE! Use the corresponding macro os_file_write(), not directly
+this function!
Requests a synchronous write operation.
@return TRUE if request was successful, FALSE if fail */
UNIV_INTERN
ibool
-os_file_write(
-/*==========*/
+os_file_write_func(
+/*===============*/
const char* name, /*!< in: name of the file or path as a
null-terminated string */
os_file_t file, /*!< in: handle to a file */
@@ -2628,7 +2786,10 @@ os_file_write(
ulint i;
#endif /* !UNIV_HOTBACKUP */
- ut_a((offset & 0xFFFFFFFF) == offset);
+ /* On 64-bit Windows, ulint is 64 bits. But offset and n should be
+ no more than 32 bits. */
+ ut_a((offset & 0xFFFFFFFFUL) == offset);
+ ut_a((n & 0xFFFFFFFFUL) == n);
os_n_file_writes++;
@@ -2686,7 +2847,7 @@ retry:
# ifdef UNIV_DO_FLUSH
if (!os_do_not_call_flush_at_each_write) {
- ut_a(TRUE == os_file_flush(file));
+ ut_a(TRUE == os_file_flush(file, TRUE));
}
# endif /* UNIV_DO_FLUSH */
@@ -3064,15 +3225,106 @@ os_aio_array_get_nth_slot(
return((array->slots) + index);
}
-/************************************************************************//**
-Creates an aio wait array.
-@return own: aio array */
+#if defined(LINUX_NATIVE_AIO)
+/******************************************************************//**
+Creates an io_context for native linux AIO.
+@return TRUE on success. */
+static
+ibool
+os_aio_linux_create_io_ctx(
+/*=======================*/
+ ulint max_events, /*!< in: number of events. */
+ io_context_t* io_ctx) /*!< out: io_ctx to initialize. */
+{
+ int ret;
+ ulint retries = 0;
+
+retry:
+ memset(io_ctx, 0x0, sizeof(*io_ctx));
+
+ /* Initialize the io_ctx. Tell it how many pending
+ IO requests this context will handle. */
+
+ ret = io_setup(max_events, io_ctx);
+ if (ret == 0) {
+#if defined(UNIV_AIO_DEBUG)
+ fprintf(stderr,
+ "InnoDB: Linux native AIO:"
+ " initialized io_ctx for segment\n");
+#endif
+ /* Success. Return now. */
+ return(TRUE);
+ }
+
+ /* If we hit EAGAIN we'll make a few attempts before failing. */
+
+ switch (ret) {
+ case -EAGAIN:
+ if (retries == 0) {
+ /* First time around. */
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: Warning: io_setup() failed"
+ " with EAGAIN. Will make %d attempts"
+ " before giving up.\n",
+ OS_AIO_IO_SETUP_RETRY_ATTEMPTS);
+ }
+
+ if (retries < OS_AIO_IO_SETUP_RETRY_ATTEMPTS) {
+ ++retries;
+ fprintf(stderr,
+ "InnoDB: Warning: io_setup() attempt"
+ " %lu failed.\n",
+ retries);
+ os_thread_sleep(OS_AIO_IO_SETUP_RETRY_SLEEP);
+ goto retry;
+ }
+
+ /* Have tried enough. Better call it a day. */
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: Error: io_setup() failed"
+ " with EAGAIN after %d attempts.\n",
+ OS_AIO_IO_SETUP_RETRY_ATTEMPTS);
+ break;
+
+ case -ENOSYS:
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: Error: Linux Native AIO interface"
+ " is not supported on this platform. Please"
+ " check your OS documentation and install"
+ " appropriate binary of InnoDB.\n");
+
+ break;
+
+ default:
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: Error: Linux Native AIO setup"
+ " returned following error[%d]\n", -ret);
+ break;
+ }
+
+ fprintf(stderr,
+ "InnoDB: You can disable Linux Native AIO by"
+ " setting innodb_native_aio = off in my.cnf\n");
+ return(FALSE);
+}
+#endif /* LINUX_NATIVE_AIO */
+
+/******************************************************************//**
+Creates an aio wait array. Note that we return NULL in case of failure.
+We don't care about freeing memory here because we assume that a
+failure will result in server refusing to start up.
+@return own: aio array, NULL on failure */
static
os_aio_array_t*
os_aio_array_create(
/*================*/
- ulint n, /*!< in: maximum number of pending aio operations
- allowed; n must be divisible by n_segments */
+ ulint n, /*!< in: maximum number of pending aio
+ operations allowed; n must be
+ divisible by n_segments */
ulint n_segments) /*!< in: number of segments in the aio array */
{
os_aio_array_t* array;
@@ -3080,13 +3332,15 @@ os_aio_array_create(
os_aio_slot_t* slot;
#ifdef WIN_ASYNC_IO
OVERLAPPED* over;
+#elif defined(LINUX_NATIVE_AIO)
+ struct io_event* io_event = NULL;
#endif
ut_a(n > 0);
ut_a(n_segments > 0);
array = ut_malloc(sizeof(os_aio_array_t));
- array->mutex = os_mutex_create(NULL);
+ array->mutex = os_mutex_create();
array->not_full = os_event_create(NULL);
array->is_empty = os_event_create(NULL);
@@ -3095,23 +3349,66 @@ os_aio_array_create(
array->n_slots = n;
array->n_segments = n_segments;
array->n_reserved = 0;
+ array->cur_seg = 0;
array->slots = ut_malloc(n * sizeof(os_aio_slot_t));
#ifdef __WIN__
- array->native_events = ut_malloc(n * sizeof(os_native_event_t));
+ array->handles = ut_malloc(n * sizeof(HANDLE));
#endif
+
+#if defined(LINUX_NATIVE_AIO)
+ array->aio_ctx = NULL;
+ array->aio_events = NULL;
+
+ /* If we are not using native aio interface then skip this
+ part of initialization. */
+ if (!srv_use_native_aio) {
+ goto skip_native_aio;
+ }
+
+ /* Initialize the io_context array. One io_context
+ per segment in the array. */
+
+ array->aio_ctx = ut_malloc(n_segments *
+ sizeof(*array->aio_ctx));
+ for (i = 0; i < n_segments; ++i) {
+ if (!os_aio_linux_create_io_ctx(n/n_segments,
+ &array->aio_ctx[i])) {
+ /* If something bad happened during aio setup
+ we should call it a day and return right away.
+ We don't care about any leaks because a failure
+ to initialize the io subsystem means that the
+ server (or atleast the innodb storage engine)
+ is not going to startup. */
+ return(NULL);
+ }
+ }
+
+ /* Initialize the event array. One event per slot. */
+ io_event = ut_malloc(n * sizeof(*io_event));
+ memset(io_event, 0x0, sizeof(*io_event) * n);
+ array->aio_events = io_event;
+
+skip_native_aio:
+#endif /* LINUX_NATIVE_AIO */
for (i = 0; i < n; i++) {
slot = os_aio_array_get_nth_slot(array, i);
slot->pos = i;
slot->reserved = FALSE;
#ifdef WIN_ASYNC_IO
- slot->event = os_event_create(NULL);
+ slot->handle = CreateEvent(NULL,TRUE, FALSE, NULL);
over = &(slot->control);
- over->hEvent = slot->event->handle;
+ over->hEvent = slot->handle;
+
+ *((array->handles) + i) = over->hEvent;
- *((array->native_events) + i) = over->hEvent;
+#elif defined(LINUX_NATIVE_AIO)
+
+ memset(&slot->control, 0x0, sizeof(slot->control));
+ slot->n_bytes = 0;
+ slot->ret = 0;
#endif
}
@@ -3131,17 +3428,24 @@ os_aio_array_free(
for (i = 0; i < array->n_slots; i++) {
os_aio_slot_t* slot = os_aio_array_get_nth_slot(array, i);
- os_event_free(slot->event);
+ CloseHandle(slot->handle);
}
#endif /* WIN_ASYNC_IO */
#ifdef __WIN__
- ut_free(array->native_events);
+ ut_free(array->handles);
#endif /* __WIN__ */
os_mutex_free(array->mutex);
os_event_free(array->not_full);
os_event_free(array->is_empty);
+#if defined(LINUX_NATIVE_AIO)
+ if (srv_use_native_aio) {
+ ut_free(array->aio_events);
+ ut_free(array->aio_ctx);
+ }
+#endif /* LINUX_NATIVE_AIO */
+
ut_free(array->slots);
ut_free(array);
}
@@ -3154,7 +3458,7 @@ respectively. The caller must create an i/o handler thread for each
segment in these arrays. This function also creates the sync array.
No i/o handler thread needs to be created for that */
UNIV_INTERN
-void
+ibool
os_aio_init(
/*========*/
ulint n_per_seg, /*<! in: maximum number of pending aio
@@ -3173,37 +3477,52 @@ os_aio_init(
for (i = 0; i < n_segments; i++) {
srv_set_io_thread_op_info(i, "not started yet");
- os_aio_thread_buffer[i] = 0;
- os_aio_thread_buffer_size[i] = 0;
}
/* fprintf(stderr, "Array n per seg %lu\n", n_per_seg); */
- os_aio_first_write_segment = os_aio_first_read_segment + n_read_segs;
os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1);
+ if (os_aio_ibuf_array == NULL) {
+ goto err_exit;
+ }
srv_io_thread_function[0] = "insert buffer thread";
os_aio_log_array = os_aio_array_create(n_per_seg, 1);
+ if (os_aio_log_array == NULL) {
+ goto err_exit;
+ }
srv_io_thread_function[1] = "log thread";
- os_aio_read_array = os_aio_array_create(n_per_seg,
+ os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg,
n_read_segs);
+ if (os_aio_read_array == NULL) {
+ goto err_exit;
+ }
+
for (i = 2; i < 2 + n_read_segs; i++) {
ut_a(i < SRV_MAX_N_IO_THREADS);
srv_io_thread_function[i] = "read thread";
}
- os_aio_write_array = os_aio_array_create(n_per_seg,
+ os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg,
n_write_segs);
+ if (os_aio_write_array == NULL) {
+ goto err_exit;
+ }
+
for (i = 2 + n_read_segs; i < n_segments; i++) {
ut_a(i < SRV_MAX_N_IO_THREADS);
srv_io_thread_function[i] = "write thread";
}
os_aio_sync_array = os_aio_array_create(n_slots_sync, 1);
+ if (os_aio_sync_array == NULL) {
+ goto err_exit;
+ }
+
os_aio_n_segments = n_segments;
@@ -3217,6 +3536,11 @@ os_aio_init(
os_last_printout = time(NULL);
+ return(TRUE);
+
+err_exit:
+ return(FALSE);
+
}
/***********************************************************************
@@ -3262,7 +3586,7 @@ os_aio_array_wake_win_aio_at_shutdown(
for (i = 0; i < array->n_slots; i++) {
- os_event_set((array->slots + i)->event);
+ SetEvent((array->slots + i)->handle);
}
}
#endif
@@ -3283,6 +3607,19 @@ os_aio_wake_all_threads_at_shutdown(void)
os_aio_array_wake_win_aio_at_shutdown(os_aio_write_array);
os_aio_array_wake_win_aio_at_shutdown(os_aio_ibuf_array);
os_aio_array_wake_win_aio_at_shutdown(os_aio_log_array);
+
+#elif defined(LINUX_NATIVE_AIO)
+
+ /* When using native AIO interface the io helper threads
+ wait on io_getevents with a timeout value of 500ms. At
+ each wake up these threads check the server status.
+ No need to do anything to wake them up. */
+
+ if (srv_use_native_aio) {
+ return;
+ }
+ /* Fall through to simulated AIO handler wakeup if we are
+ not using native AIO. */
#endif
/* This loop wakes up all simulated ai/o threads */
@@ -3399,16 +3736,27 @@ os_aio_array_reserve_slot(
ulint offset_high, /*!< in: most significant 32 bits of
offset */
ulint len, /*!< in: length of the block to read or write */
- trx_t* trx)
+ ulint space_id)
{
- os_aio_slot_t* slot;
+ os_aio_slot_t* slot = NULL;
#ifdef WIN_ASYNC_IO
OVERLAPPED* control;
+
+#elif defined(LINUX_NATIVE_AIO)
+
+ struct iocb* iocb;
+ off_t aio_offset;
+
#endif
ulint i;
+ ulint counter;
ulint slots_per_seg;
ulint local_seg;
+#ifdef WIN_ASYNC_IO
+ ut_a((len & 0xFFFFFFFFUL) == len);
+#endif
+
/* No need of a mutex. Only reading constant fields */
slots_per_seg = array->n_slots / array->n_segments;
@@ -3424,7 +3772,7 @@ loop:
if (array->n_reserved == array->n_slots) {
os_mutex_exit(array->mutex);
- if (!os_aio_use_native_aio) {
+ if (!srv_use_native_aio) {
/* If the handler threads are suspended, wake them
so that we get more slots */
@@ -3436,17 +3784,13 @@ loop:
goto loop;
}
- /* First try to find a slot in the preferred local segment */
- for (i = local_seg * slots_per_seg; i < array->n_slots; i++) {
- slot = os_aio_array_get_nth_slot(array, i);
-
- if (slot->reserved == FALSE) {
- goto found;
- }
- }
+ /* We start our search for an available slot from our preferred
+ local segment and do a full scan of the array. We are
+ guaranteed to find a slot in full scan. */
+ for (i = local_seg * slots_per_seg, counter = 0;
+ counter < array->n_slots; i++, counter++) {
- /* Fall back to a full scan. We are guaranteed to find a slot */
- for (i = 0;; i++) {
+ i %= array->n_slots;
slot = os_aio_array_get_nth_slot(array, i);
if (slot->reserved == FALSE) {
@@ -3454,6 +3798,9 @@ loop:
}
}
+ /* We MUST always be able to get hold of a reserved slot. */
+ ut_error;
+
found:
ut_a(slot->reserved == FALSE);
array->n_reserved++;
@@ -3477,16 +3824,50 @@ found:
slot->buf = buf;
slot->offset = offset;
slot->offset_high = offset_high;
-// slot->io_already_done = FALSE;
- slot->status = OS_AIO_NOT_ISSUED;
+ slot->io_already_done = FALSE;
+ slot->space_id = space_id;
#ifdef WIN_ASYNC_IO
control = &(slot->control);
control->Offset = (DWORD)offset;
control->OffsetHigh = (DWORD)offset_high;
- os_event_reset(slot->event);
-#endif
+ ResetEvent(slot->handle);
+
+#elif defined(LINUX_NATIVE_AIO)
+
+ /* If we are not using native AIO skip this part. */
+ if (!srv_use_native_aio) {
+ goto skip_native_aio;
+ }
+
+ /* Check if we are dealing with 64 bit arch.
+ If not then make sure that offset fits in 32 bits. */
+ if (sizeof(aio_offset) == 8) {
+ aio_offset = offset_high;
+ aio_offset <<= 32;
+ aio_offset += offset;
+ } else {
+ ut_a(offset_high == 0);
+ aio_offset = offset;
+ }
+
+ iocb = &slot->control;
+
+ if (type == OS_FILE_READ) {
+ io_prep_pread(iocb, file, buf, len, aio_offset);
+ } else {
+ ut_a(type == OS_FILE_WRITE);
+ io_prep_pwrite(iocb, file, buf, len, aio_offset);
+ }
+
+ iocb->data = (void*)slot;
+ slot->n_bytes = 0;
+ slot->ret = 0;
+ /*fprintf(stderr, "Filled up Linux native iocb.\n");*/
+
+skip_native_aio:
+#endif /* LINUX_NATIVE_AIO */
os_mutex_exit(array->mutex);
return(slot);
@@ -3509,7 +3890,6 @@ os_aio_array_free_slot(
ut_ad(slot->reserved);
slot->reserved = FALSE;
- slot->status = OS_AIO_NOT_ISSUED;
array->n_reserved--;
@@ -3522,7 +3902,23 @@ os_aio_array_free_slot(
}
#ifdef WIN_ASYNC_IO
- os_event_reset(slot->event);
+
+ ResetEvent(slot->handle);
+
+#elif defined(LINUX_NATIVE_AIO)
+
+ if (srv_use_native_aio) {
+ memset(&slot->control, 0x0, sizeof(slot->control));
+ slot->n_bytes = 0;
+ slot->ret = 0;
+ /*fprintf(stderr, "Freed up Linux native slot.\n");*/
+ } else {
+ /* These fields should not be used if we are not
+ using native AIO. */
+ ut_ad(slot->n_bytes == 0);
+ ut_ad(slot->ret == 0);
+ }
+
#endif
os_mutex_exit(array->mutex);
}
@@ -3542,22 +3938,20 @@ os_aio_simulated_wake_handler_thread(
ulint n;
ulint i;
- ut_ad(!os_aio_use_native_aio);
+ ut_ad(!srv_use_native_aio);
segment = os_aio_get_array_and_local_segment(&array, global_segment);
- n = array->n_slots;
+ n = array->n_slots / array->n_segments;
/* Look through n slots after the segment * n'th slot */
os_mutex_enter(array->mutex);
for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array, i);
+ slot = os_aio_array_get_nth_slot(array, i + segment * n);
- if (slot->reserved &&
- (slot->status == OS_AIO_NOT_ISSUED ||
- slot->status == OS_AIO_DONE)) {
+ if (slot->reserved) {
/* Found an i/o request */
break;
@@ -3567,25 +3961,7 @@ os_aio_simulated_wake_handler_thread(
os_mutex_exit(array->mutex);
if (i < n) {
- if (array == os_aio_ibuf_array) {
- os_event_set(os_aio_segment_wait_events[0]);
-
- } else if (array == os_aio_log_array) {
- os_event_set(os_aio_segment_wait_events[1]);
-
- } else if (array == os_aio_read_array) {
- ulint x;
- for (x = os_aio_first_read_segment; x < os_aio_first_write_segment; x++)
- os_event_set(os_aio_segment_wait_events[x]);
-
- } else if (array == os_aio_write_array) {
- ulint x;
- for (x = os_aio_first_write_segment; x < os_aio_n_segments; x++)
- os_event_set(os_aio_segment_wait_events[x]);
-
- } else {
- ut_a(0);
- }
+ os_event_set(os_aio_segment_wait_events[global_segment]);
}
}
@@ -3596,7 +3972,9 @@ void
os_aio_simulated_wake_handler_threads(void)
/*=======================================*/
{
- if (os_aio_use_native_aio) {
+ ulint i;
+
+ if (srv_use_native_aio) {
/* We do not use simulated aio: do nothing */
return;
@@ -3604,10 +3982,9 @@ os_aio_simulated_wake_handler_threads(void)
os_aio_recommend_sleep_for_read_threads = FALSE;
- os_aio_simulated_wake_handler_thread(0);
- os_aio_simulated_wake_handler_thread(1);
- os_aio_simulated_wake_handler_thread(os_aio_first_read_segment);
- os_aio_simulated_wake_handler_thread(os_aio_first_write_segment);
+ for (i = 0; i < os_aio_n_segments; i++) {
+ os_aio_simulated_wake_handler_thread(i);
+ }
}
/**********************************************************************//**
@@ -3629,7 +4006,7 @@ readahead requests. */
os_aio_array_t* array;
ulint g;
- if (os_aio_use_native_aio) {
+ if (srv_use_native_aio) {
/* We do not use simulated aio: do nothing */
return;
@@ -3648,13 +4025,62 @@ readahead requests. */
#endif /* __WIN__ */
}
+#if defined(LINUX_NATIVE_AIO)
/*******************************************************************//**
+Dispatch an AIO request to the kernel.
+@return TRUE on success. */
+static
+ibool
+os_aio_linux_dispatch(
+/*==================*/
+ os_aio_array_t* array, /*!< in: io request array. */
+ os_aio_slot_t* slot) /*!< in: an already reserved slot. */
+{
+ int ret;
+ ulint io_ctx_index;
+ struct iocb* iocb;
+
+ ut_ad(slot != NULL);
+ ut_ad(array);
+
+ ut_a(slot->reserved);
+
+ /* Find out what we are going to work with.
+ The iocb struct is directly in the slot.
+ The io_context is one per segment. */
+
+ iocb = &slot->control;
+ io_ctx_index = (slot->pos * array->n_segments) / array->n_slots;
+
+ ret = io_submit(array->aio_ctx[io_ctx_index], 1, &iocb);
+
+#if defined(UNIV_AIO_DEBUG)
+ fprintf(stderr,
+ "io_submit[%c] ret[%d]: slot[%p] ctx[%p] seg[%lu]\n",
+ (slot->type == OS_FILE_WRITE) ? 'w' : 'r', ret, slot,
+ array->aio_ctx[io_ctx_index], (ulong)io_ctx_index);
+#endif
+
+ /* io_submit returns number of successfully
+ queued requests or -errno. */
+ if (UNIV_UNLIKELY(ret != 1)) {
+ errno = -ret;
+ return(FALSE);
+ }
+
+ return(TRUE);
+}
+#endif /* LINUX_NATIVE_AIO */
+
+
+/*******************************************************************//**
+NOTE! Use the corresponding macro os_aio(), not directly this function!
Requests an asynchronous i/o operation.
@return TRUE if request was queued successfully, FALSE if fail */
UNIV_INTERN
ibool
-os_aio(
-/*===*/
+os_aio_func(
+/*========*/
ulint type, /*!< in: OS_FILE_READ or OS_FILE_WRITE */
ulint mode, /*!< in: OS_AIO_NORMAL, ..., possibly ORed
to OS_AIO_SIMULATED_WAKE_LATER: the
@@ -3687,6 +4113,7 @@ os_aio(
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
+ ulint space_id,
trx_t* trx)
{
os_aio_array_t* array;
@@ -3698,8 +4125,7 @@ os_aio(
struct fil_node_struct * dummy_mess1;
void* dummy_mess2;
ulint dummy_type;
-#endif
- ulint err = 0;
+#endif /* WIN_ASYNC_IO */
ibool retry;
ulint wake_later;
@@ -3708,41 +4134,51 @@ os_aio(
ut_ad(n > 0);
ut_ad(n % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0);
- ut_ad(os_aio_validate());
+ ut_ad(os_aio_validate_skip());
+#ifdef WIN_ASYNC_IO
+ ut_ad((n & 0xFFFFFFFFUL) == n);
+#endif
wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER;
mode = mode & (~OS_AIO_SIMULATED_WAKE_LATER);
if (mode == OS_AIO_SYNC
#ifdef WIN_ASYNC_IO
- && !os_aio_use_native_aio
-#endif
+ && !srv_use_native_aio
+#endif /* WIN_ASYNC_IO */
) {
/* This is actually an ordinary synchronous read or write:
no need to use an i/o-handler thread. NOTE that if we use
Windows async i/o, Windows does not allow us to use
ordinary synchronous os_file_read etc. on the same file,
therefore we have built a special mechanism for synchronous
- wait in the Windows case. */
+ wait in the Windows case.
+ Also note that the Performance Schema instrumentation has
+ been performed by current os_aio_func()'s wrapper function
+ pfs_os_aio_func(). So we would no longer need to call
+ Performance Schema instrumented os_file_read() and
+ os_file_write(). Instead, we should use os_file_read_func()
+ and os_file_write_func() */
if (type == OS_FILE_READ) {
- return(_os_file_read(file, buf, offset,
+ return(os_file_read_trx(file, buf, offset,
offset_high, n, trx));
}
ut_a(type == OS_FILE_WRITE);
- return(os_file_write(name, file, buf, offset, offset_high, n));
+ return(os_file_write_func(name, file, buf, offset,
+ offset_high, n));
}
try_again:
- if (mode == OS_AIO_NORMAL) {
- if (type == OS_FILE_READ) {
- array = os_aio_read_array;
- } else {
- array = os_aio_write_array;
- }
- } else if (mode == OS_AIO_IBUF) {
+ switch (mode) {
+ case OS_AIO_NORMAL:
+ array = (type == OS_FILE_READ)
+ ? os_aio_read_array
+ : os_aio_write_array;
+ break;
+ case OS_AIO_IBUF:
ut_ad(type == OS_FILE_READ);
/* Reduce probability of deadlock bugs in connection with ibuf:
do not let the ibuf i/o handler sleep */
@@ -3750,14 +4186,21 @@ try_again:
wake_later = FALSE;
array = os_aio_ibuf_array;
- } else if (mode == OS_AIO_LOG) {
-
+ break;
+ case OS_AIO_LOG:
array = os_aio_log_array;
- } else if (mode == OS_AIO_SYNC) {
+ break;
+ case OS_AIO_SYNC:
array = os_aio_sync_array;
- } else {
- array = NULL; /* Eliminate compiler warning */
+
+#if defined(LINUX_NATIVE_AIO)
+ /* In Linux native AIO we don't use sync IO array. */
+ ut_a(!srv_use_native_aio);
+#endif /* LINUX_NATIVE_AIO */
+ break;
+ default:
ut_error;
+ array = NULL; /* Eliminate compiler warning */
}
if (trx && type == OS_FILE_READ)
@@ -3766,15 +4209,19 @@ try_again:
trx->io_read += n;
}
slot = os_aio_array_reserve_slot(type, array, message1, message2, file,
- name, buf, offset, offset_high, n, trx);
+ name, buf, offset, offset_high, n, space_id);
if (type == OS_FILE_READ) {
- if (os_aio_use_native_aio) {
-#ifdef WIN_ASYNC_IO
+ if (srv_use_native_aio) {
os_n_file_reads++;
- os_bytes_read_since_printout += len;
-
+ os_bytes_read_since_printout += n;
+#ifdef WIN_ASYNC_IO
ret = ReadFile(file, buf, (DWORD)n, &len,
&(slot->control));
+
+#elif defined(LINUX_NATIVE_AIO)
+ if (!os_aio_linux_dispatch(array, slot)) {
+ goto err_exit;
+ }
#endif
} else {
if (!wake_later) {
@@ -3784,11 +4231,16 @@ try_again:
}
}
} else if (type == OS_FILE_WRITE) {
- if (os_aio_use_native_aio) {
-#ifdef WIN_ASYNC_IO
+ if (srv_use_native_aio) {
os_n_file_writes++;
+#ifdef WIN_ASYNC_IO
ret = WriteFile(file, buf, (DWORD)n, &len,
&(slot->control));
+
+#elif defined(LINUX_NATIVE_AIO)
+ if (!os_aio_linux_dispatch(array, slot)) {
+ goto err_exit;
+ }
#endif
} else {
if (!wake_later) {
@@ -3802,7 +4254,7 @@ try_again:
}
#ifdef WIN_ASYNC_IO
- if (os_aio_use_native_aio) {
+ if (srv_use_native_aio) {
if ((ret && len == n)
|| (!ret && GetLastError() == ERROR_IO_PENDING)) {
/* aio was queued successfully! */
@@ -3825,15 +4277,15 @@ try_again:
return(TRUE);
}
- err = 1; /* Fall through the next if */
- }
-#endif
- if (err == 0) {
- /* aio was queued successfully! */
-
- return(TRUE);
+ goto err_exit;
}
+#endif /* WIN_ASYNC_IO */
+ /* aio was queued successfully! */
+ return(TRUE);
+#if defined LINUX_NATIVE_AIO || defined WIN_ASYNC_IO
+err_exit:
+#endif /* LINUX_NATIVE_AIO || WIN_ASYNC_IO */
os_aio_array_free_slot(array, slot);
retry = os_file_handle_error(name,
@@ -3876,7 +4328,8 @@ os_aio_windows_handle(
parameters are valid and can be used to
restart the operation, for example */
void** message2,
- ulint* type) /*!< out: OS_FILE_WRITE or ..._READ */
+ ulint* type, /*!< out: OS_FILE_WRITE or ..._READ */
+ ulint* space_id)
{
ulint orig_seg = segment;
os_aio_array_t* array;
@@ -3898,24 +4351,49 @@ os_aio_windows_handle(
/* NOTE! We only access constant fields in os_aio_array. Therefore
we do not have to acquire the protecting mutex yet */
- ut_ad(os_aio_validate());
+ ut_ad(os_aio_validate_skip());
ut_ad(segment < array->n_segments);
- n = array->n_slots;
+ n = array->n_slots / array->n_segments;
if (array == os_aio_sync_array) {
- os_event_wait(os_aio_array_get_nth_slot(array, pos)->event);
+ WaitForSingleObject(
+ os_aio_array_get_nth_slot(array, pos)->handle,
+ INFINITE);
i = pos;
} else {
srv_set_io_thread_op_info(orig_seg, "wait Windows aio");
- i = os_event_wait_multiple(n,
- (array->native_events)
- );
+ i = WaitForMultipleObjects((DWORD) n,
+ array->handles + segment * n,
+ FALSE,
+ INFINITE);
+ }
+
+ if (srv_recovery_stats && recv_recovery_is_on() && n_consecutive) {
+ mutex_enter(&(recv_sys->mutex));
+ if (slot->type == OS_FILE_READ) {
+ recv_sys->stats_read_io_pages += n_consecutive;
+ recv_sys->stats_read_io_consecutive[n_consecutive - 1]++;
+ } else if (slot->type == OS_FILE_WRITE) {
+ recv_sys->stats_write_io_pages += n_consecutive;
+ recv_sys->stats_write_io_consecutive[n_consecutive - 1]++;
+ }
+ mutex_exit(&(recv_sys->mutex));
}
os_mutex_enter(array->mutex);
- slot = os_aio_array_get_nth_slot(array, i);
+ if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS
+ && array->n_reserved == 0) {
+ *message1 = NULL;
+ *message2 = NULL;
+ os_mutex_exit(array->mutex);
+ return(TRUE);
+ }
+
+ ut_a(i >= WAIT_OBJECT_0 && i <= WAIT_OBJECT_0 + n);
+
+ slot = os_aio_array_get_nth_slot(array, i + segment * n);
ut_a(slot->reserved);
@@ -3930,6 +4408,7 @@ os_aio_windows_handle(
*message2 = slot->message2;
*type = slot->type;
+ *space_id = slot->space_id;
if (ret && len == slot->len) {
ret_val = TRUE;
@@ -3937,7 +4416,9 @@ os_aio_windows_handle(
#ifdef UNIV_DO_FLUSH
if (slot->type == OS_FILE_WRITE
&& !os_do_not_call_flush_at_each_write) {
- ut_a(TRUE == os_file_flush(slot->file));
+ if (!os_file_flush(slot->file, TRUE)) {
+ ut_error;
+ }
}
#endif /* UNIV_DO_FLUSH */
} else if (os_file_handle_error(slot->name, "Windows aio")) {
@@ -3954,16 +4435,30 @@ os_aio_windows_handle(
/* retry failed read/write operation synchronously.
No need to hold array->mutex. */
+#ifdef UNIV_PFS_IO
+ /* This read/write does not go through os_file_read
+ and os_file_write APIs, need to register with
+ performance schema explicitly here. */
+ struct PSI_file_locker* locker = NULL;
+ register_pfs_file_io_begin(locker, slot->file, slot->len,
+ (slot->type == OS_FILE_WRITE)
+ ? PSI_FILE_WRITE
+ : PSI_FILE_READ,
+ __FILE__, __LINE__);
+#endif
+
+ ut_a((slot->len & 0xFFFFFFFFUL) == slot->len);
+
switch (slot->type) {
case OS_FILE_WRITE:
ret = WriteFile(slot->file, slot->buf,
- slot->len, &len,
+ (DWORD) slot->len, &len,
&(slot->control));
break;
case OS_FILE_READ:
ret = ReadFile(slot->file, slot->buf,
- slot->len, &len,
+ (DWORD) slot->len, &len,
&(slot->control));
break;
@@ -3971,6 +4466,10 @@ os_aio_windows_handle(
ut_error;
}
+#ifdef UNIV_PFS_IO
+ register_pfs_file_io_end(locker, len);
+#endif
+
if (!ret && GetLastError() == ERROR_IO_PENDING) {
/* aio was queued successfully!
We want a synchronous i/o operation on a
@@ -3992,6 +4491,261 @@ os_aio_windows_handle(
}
#endif
+#if defined(LINUX_NATIVE_AIO)
+/******************************************************************//**
+This function is only used in Linux native asynchronous i/o. This is
+called from within the io-thread. If there are no completed IO requests
+in the slot array, the thread calls this function to collect more
+requests from the kernel.
+The io-thread waits on io_getevents(), which is a blocking call, with
+a timeout value. Unless the system is very heavy loaded, keeping the
+io-thread very busy, the io-thread will spend most of its time waiting
+in this function.
+The io-thread also exits in this function. It checks server status at
+each wakeup and that is why we use timed wait in io_getevents(). */
+static
+void
+os_aio_linux_collect(
+/*=================*/
+ os_aio_array_t* array, /*!< in/out: slot array. */
+ ulint segment, /*!< in: local segment no. */
+ ulint seg_size) /*!< in: segment size. */
+{
+ int i;
+ int ret;
+ ulint start_pos;
+ ulint end_pos;
+ struct timespec timeout;
+ struct io_event* events;
+ struct io_context* io_ctx;
+
+ /* sanity checks. */
+ ut_ad(array != NULL);
+ ut_ad(seg_size > 0);
+ ut_ad(segment < array->n_segments);
+
+ /* Which part of event array we are going to work on. */
+ events = &array->aio_events[segment * seg_size];
+
+ /* Which io_context we are going to use. */
+ io_ctx = array->aio_ctx[segment];
+
+ /* Starting point of the segment we will be working on. */
+ start_pos = segment * seg_size;
+
+ /* End point. */
+ end_pos = start_pos + seg_size;
+
+retry:
+
+ /* Initialize the events. The timeout value is arbitrary.
+ We probably need to experiment with it a little. */
+ memset(events, 0, sizeof(*events) * seg_size);
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = OS_AIO_REAP_TIMEOUT;
+
+ ret = io_getevents(io_ctx, 1, seg_size, events, &timeout);
+
+ if (ret > 0) {
+ for (i = 0; i < ret; i++) {
+ os_aio_slot_t* slot;
+ struct iocb* control;
+
+ control = (struct iocb *)events[i].obj;
+ ut_a(control != NULL);
+
+ slot = (os_aio_slot_t *) control->data;
+
+ /* Some sanity checks. */
+ ut_a(slot != NULL);
+ ut_a(slot->reserved);
+
+#if defined(UNIV_AIO_DEBUG)
+ fprintf(stderr,
+ "io_getevents[%c]: slot[%p] ctx[%p]"
+ " seg[%lu]\n",
+ (slot->type == OS_FILE_WRITE) ? 'w' : 'r',
+ slot, io_ctx, segment);
+#endif
+
+ /* We are not scribbling previous segment. */
+ ut_a(slot->pos >= start_pos);
+
+ /* We have not overstepped to next segment. */
+ ut_a(slot->pos < end_pos);
+
+ /* Mark this request as completed. The error handling
+ will be done in the calling function. */
+ os_mutex_enter(array->mutex);
+ slot->n_bytes = events[i].res;
+ slot->ret = events[i].res2;
+ slot->io_already_done = TRUE;
+ os_mutex_exit(array->mutex);
+ }
+ return;
+ }
+
+ if (UNIV_UNLIKELY(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS)) {
+ return;
+ }
+
+ /* This error handling is for any error in collecting the
+ IO requests. The errors, if any, for any particular IO
+ request are simply passed on to the calling routine. */
+
+ switch (ret) {
+ case -EAGAIN:
+ /* Not enough resources! Try again. */
+ case -EINTR:
+ /* Interrupted! I have tested the behaviour in case of an
+ interrupt. If we have some completed IOs available then
+ the return code will be the number of IOs. We get EINTR only
+ if there are no completed IOs and we have been interrupted. */
+ case 0:
+ /* No pending request! Go back and check again. */
+ goto retry;
+ }
+
+ /* All other errors should cause a trap for now. */
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: unexpected ret_code[%d] from io_getevents()!\n",
+ ret);
+ ut_error;
+}
+
+/**********************************************************************//**
+This function is only used in Linux native asynchronous i/o.
+Waits for an aio operation to complete. This function is used to wait for
+the completed requests. The aio array of pending requests is divided
+into segments. The thread specifies which segment or slot it wants to wait
+for. NOTE: this function will also take care of freeing the aio slot,
+therefore no other thread is allowed to do the freeing!
+@return TRUE if the IO was successful */
+UNIV_INTERN
+ibool
+os_aio_linux_handle(
+/*================*/
+ ulint global_seg, /*!< in: segment number in the aio array
+ to wait for; segment 0 is the ibuf
+ i/o thread, segment 1 is log i/o thread,
+ then follow the non-ibuf read threads,
+ and the last are the non-ibuf write
+ threads. */
+ fil_node_t**message1, /*!< out: the messages passed with the */
+ void** message2, /*!< aio request; note that in case the
+ aio operation failed, these output
+ parameters are valid and can be used to
+ restart the operation. */
+ ulint* type, /*!< out: OS_FILE_WRITE or ..._READ */
+ ulint* space_id)
+{
+ ulint segment;
+ os_aio_array_t* array;
+ os_aio_slot_t* slot;
+ ulint n;
+ ulint i;
+ ibool ret = FALSE;
+
+ /* Should never be doing Sync IO here. */
+ ut_a(global_seg != ULINT_UNDEFINED);
+
+ /* Find the array and the local segment. */
+ segment = os_aio_get_array_and_local_segment(&array, global_seg);
+ n = array->n_slots / array->n_segments;
+
+ /* Loop until we have found a completed request. */
+ for (;;) {
+ ibool any_reserved = FALSE;
+ os_mutex_enter(array->mutex);
+ for (i = 0; i < n; ++i) {
+ slot = os_aio_array_get_nth_slot(
+ array, i + segment * n);
+ if (!slot->reserved) {
+ continue;
+ } else if (slot->io_already_done) {
+ /* Something for us to work on. */
+ goto found;
+ } else {
+ any_reserved = TRUE;
+ }
+ }
+
+ os_mutex_exit(array->mutex);
+
+ /* There is no completed request.
+ If there is no pending request at all,
+ and the system is being shut down, exit. */
+ if (UNIV_UNLIKELY
+ (!any_reserved
+ && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS)) {
+ *message1 = NULL;
+ *message2 = NULL;
+ return(TRUE);
+ }
+
+ /* Wait for some request. Note that we return
+ from wait iff we have found a request. */
+
+ srv_set_io_thread_op_info(global_seg,
+ "waiting for completed aio requests");
+ os_aio_linux_collect(array, segment, n);
+ }
+
+found:
+ /* Note that it may be that there are more then one completed
+ IO requests. We process them one at a time. We may have a case
+ here to improve the performance slightly by dealing with all
+ requests in one sweep. */
+ srv_set_io_thread_op_info(global_seg,
+ "processing completed aio requests");
+
+ /* Ensure that we are scribbling only our segment. */
+ ut_a(i < n);
+
+ ut_ad(slot != NULL);
+ ut_ad(slot->reserved);
+ ut_ad(slot->io_already_done);
+
+ *message1 = slot->message1;
+ *message2 = slot->message2;
+
+ *type = slot->type;
+ *space_id = slot->space_id;
+
+ if ((slot->ret == 0) && (slot->n_bytes == (long)slot->len)) {
+ ret = TRUE;
+
+#ifdef UNIV_DO_FLUSH
+ if (slot->type == OS_FILE_WRITE
+ && !os_do_not_call_flush_at_each_write)
+ && !os_file_flush(slot->file, TRUE) {
+ ut_error;
+ }
+#endif /* UNIV_DO_FLUSH */
+ } else {
+ errno = -slot->ret;
+
+ /* os_file_handle_error does tell us if we should retry
+ this IO. As it stands now, we don't do this retry when
+ reaping requests from a different context than
+ the dispatcher. This non-retry logic is the same for
+ windows and linux native AIO.
+ We should probably look into this to transparently
+ re-submit the IO. */
+ os_file_handle_error(slot->name, "Linux aio");
+
+ ret = FALSE;
+ }
+
+ os_mutex_exit(array->mutex);
+
+ os_aio_array_free_slot(array, slot);
+
+ return(ret);
+}
+#endif /* LINUX_NATIVE_AIO */
+
/**********************************************************************//**
Does simulated aio. This function should be called by an i/o-handler
thread.
@@ -4011,28 +4765,26 @@ os_aio_simulated_handle(
parameters are valid and can be used to
restart the operation, for example */
void** message2,
- ulint* type) /*!< out: OS_FILE_WRITE or ..._READ */
+ ulint* type, /*!< out: OS_FILE_WRITE or ..._READ */
+ ulint* space_id)
{
os_aio_array_t* array;
ulint segment;
os_aio_slot_t* slot;
os_aio_slot_t* slot2;
os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE];
- os_aio_slot_t* lowest_request;
- os_aio_slot_t* oldest_request;
ulint n_consecutive;
ulint total_len;
ulint offs;
ulint lowest_offset;
- ulint oldest_offset;
ulint biggest_age;
ulint age;
byte* combined_buf;
byte* combined_buf2;
ibool ret;
+ ibool any_reserved;
ulint n;
ulint i;
- time_t now;
/* Fix compiler warning */
*consecutive_ios = NULL;
@@ -4045,10 +4797,10 @@ restart:
srv_set_io_thread_op_info(global_segment,
"looking for i/o requests (a)");
- ut_ad(os_aio_validate());
+ ut_ad(os_aio_validate_skip());
ut_ad(segment < array->n_segments);
- n = array->n_slots;
+ n = array->n_slots / array->n_segments;
/* Look through n slots after the segment * n'th slot */
@@ -4061,18 +4813,21 @@ restart:
goto recommended_sleep;
}
- os_mutex_enter(array->mutex);
-
srv_set_io_thread_op_info(global_segment,
"looking for i/o requests (b)");
/* Check if there is a slot for which the i/o has already been
done */
+ any_reserved = FALSE;
+
+ os_mutex_enter(array->mutex);
for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array, i);
+ slot = os_aio_array_get_nth_slot(array, i + segment * n);
- if (slot->reserved && slot->status == OS_AIO_DONE) {
+ if (!slot->reserved) {
+ continue;
+ } else if (slot->io_already_done) {
if (os_aio_print_debug) {
fprintf(stderr,
@@ -4084,9 +4839,23 @@ restart:
ret = TRUE;
goto slot_io_done;
+ } else {
+ any_reserved = TRUE;
}
}
+ /* There is no completed request.
+ If there is no pending request at all,
+ and the system is being shut down, exit. */
+ if (UNIV_UNLIKELY
+ (!any_reserved
+ && srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS)) {
+ os_mutex_exit(array->mutex);
+ *message1 = NULL;
+ *message2 = NULL;
+ return(TRUE);
+ }
+
n_consecutive = 0;
/* If there are at least 2 seconds old requests, then pick the oldest
@@ -4094,57 +4863,72 @@ restart:
then pick the one at the lowest offset. */
biggest_age = 0;
- now = time(NULL);
- oldest_request = lowest_request = NULL;
- oldest_offset = lowest_offset = ULINT_MAX;
+ lowest_offset = ULINT_MAX;
- /* Find the oldest request and the request with the smallest offset */
for (i = 0; i < n; i++) {
- slot = os_aio_array_get_nth_slot(array, i);
+ slot = os_aio_array_get_nth_slot(array, i + segment * n);
- if (slot->reserved && slot->status == OS_AIO_NOT_ISSUED) {
- age = (ulint)difftime(now, slot->reservation_time);
+ if (slot->reserved) {
+ age = (ulint)difftime(time(NULL),
+ slot->reservation_time);
if ((age >= 2 && age > biggest_age)
|| (age >= 2 && age == biggest_age
- && slot->offset < oldest_offset)) {
+ && slot->offset < lowest_offset)) {
/* Found an i/o request */
+ consecutive_ios[0] = slot;
+
+ n_consecutive = 1;
+
biggest_age = age;
- oldest_request = slot;
- oldest_offset = slot->offset;
+ lowest_offset = slot->offset;
}
+ }
+ }
+
+ if (n_consecutive == 0) {
+ /* There were no old requests. Look for an i/o request at the
+ lowest offset in the array (we ignore the high 32 bits of the
+ offset in these heuristics) */
+
+ lowest_offset = ULINT_MAX;
+
+ for (i = 0; i < n; i++) {
+ slot = os_aio_array_get_nth_slot(array,
+ i + segment * n);
+
+ if (slot->reserved && slot->offset < lowest_offset) {
- /* Look for an i/o request at the lowest offset in the array
- * (we ignore the high 32 bits of the offset) */
- if (slot->offset < lowest_offset) {
/* Found an i/o request */
- lowest_request = slot;
+ consecutive_ios[0] = slot;
+
+ n_consecutive = 1;
+
lowest_offset = slot->offset;
}
}
}
- if (!lowest_request && !oldest_request) {
+ if (n_consecutive == 0) {
/* No i/o requested at the moment */
goto wait_for_io;
}
- if (oldest_request) {
- slot = oldest_request;
- } else {
- slot = lowest_request;
- }
- consecutive_ios[0] = slot;
- n_consecutive = 1;
+ /* if n_consecutive != 0, then we have assigned
+ something valid to consecutive_ios[0] */
+ ut_ad(n_consecutive != 0);
+ ut_ad(consecutive_ios[0] != NULL);
+
+ slot = consecutive_ios[0];
/* Check if there are several consecutive blocks to read or write */
consecutive_loop:
for (i = 0; i < n; i++) {
- slot2 = os_aio_array_get_nth_slot(array, i);
+ slot2 = os_aio_array_get_nth_slot(array, i + segment * n);
if (slot2->reserved && slot2 != slot
&& slot2->offset == slot->offset + slot->len
@@ -4152,8 +4936,7 @@ consecutive_loop:
&& slot->offset + slot->len > slot->offset
&& slot2->offset_high == slot->offset_high
&& slot2->type == slot->type
- && slot2->file == slot->file
- && slot2->status == OS_AIO_NOT_ISSUED) {
+ && slot2->file == slot->file) {
/* Found a consecutive i/o request */
@@ -4182,8 +4965,6 @@ consecutive_loop:
for (i = 0; i < n_consecutive; i++) {
total_len += consecutive_ios[i]->len;
- ut_a(consecutive_ios[i]->status == OS_AIO_NOT_ISSUED);
- consecutive_ios[i]->status = OS_AIO_ISSUED;
}
if (n_consecutive == 1) {
@@ -4191,14 +4972,7 @@ consecutive_loop:
combined_buf = slot->buf;
combined_buf2 = NULL;
} else {
- if ((total_len + UNIV_PAGE_SIZE) > os_aio_thread_buffer_size[global_segment]) {
- if (os_aio_thread_buffer[global_segment])
- ut_free(os_aio_thread_buffer[global_segment]);
-
- os_aio_thread_buffer[global_segment] = ut_malloc(total_len + UNIV_PAGE_SIZE);
- os_aio_thread_buffer_size[global_segment] = total_len + UNIV_PAGE_SIZE;
- }
- combined_buf2 = os_aio_thread_buffer[global_segment];
+ combined_buf2 = ut_malloc(total_len + UNIV_PAGE_SIZE);
ut_a(combined_buf2);
@@ -4209,9 +4983,6 @@ consecutive_loop:
this assumes that there is just one i/o-handler thread serving
a single segment of slots! */
- ut_a(slot->reserved);
- ut_a(slot->status == OS_AIO_ISSUED);
-
os_mutex_exit(array->mutex);
if (slot->type == OS_FILE_WRITE && n_consecutive > 1) {
@@ -4267,16 +5038,8 @@ consecutive_loop:
}
}
- if (srv_recovery_stats && recv_recovery_is_on() && n_consecutive) {
- mutex_enter(&(recv_sys->mutex));
- if (slot->type == OS_FILE_READ) {
- recv_sys->stats_read_io_pages += n_consecutive;
- recv_sys->stats_read_io_consecutive[n_consecutive - 1]++;
- } else if (slot->type == OS_FILE_WRITE) {
- recv_sys->stats_write_io_pages += n_consecutive;
- recv_sys->stats_write_io_consecutive[n_consecutive - 1]++;
- }
- mutex_exit(&(recv_sys->mutex));
+ if (combined_buf2) {
+ ut_free(combined_buf2);
}
os_mutex_enter(array->mutex);
@@ -4284,8 +5047,7 @@ consecutive_loop:
/* Mark the i/os done in slots */
for (i = 0; i < n_consecutive; i++) {
- ut_a(consecutive_ios[i]->status == OS_AIO_ISSUED);
- consecutive_ios[i]->status = OS_AIO_DONE;
+ consecutive_ios[i]->io_already_done = TRUE;
}
/* We return the messages for the first slot now, and if there were
@@ -4295,13 +5057,12 @@ consecutive_loop:
slot_io_done:
ut_a(slot->reserved);
- ut_a(slot->status == OS_AIO_DONE);
- slot->status = OS_AIO_CLAIMED;
*message1 = slot->message1;
*message2 = slot->message2;
*type = slot->type;
+ *space_id = slot->space_id;
os_mutex_exit(array->mutex);
@@ -4388,6 +5149,40 @@ os_aio_validate(void)
}
/**********************************************************************//**
+Prints pending IO requests per segment of an aio array.
+We probably don't need per segment statistics but they can help us
+during development phase to see if the IO requests are being
+distributed as expected. */
+static
+void
+os_aio_print_segment_info(
+/*======================*/
+ FILE* file, /*!< in: file where to print */
+ ulint* n_seg, /*!< in: pending IO array */
+ os_aio_array_t* array) /*!< in: array to process */
+{
+ ulint i;
+
+ ut_ad(array);
+ ut_ad(n_seg);
+ ut_ad(array->n_segments > 0);
+
+ if (array->n_segments == 1) {
+ return;
+ }
+
+ fprintf(file, " [");
+ for (i = 0; i < array->n_segments; i++) {
+ if (i != 0) {
+ fprintf(file, ", ");
+ }
+
+ fprintf(file, "%lu", n_seg[i]);
+ }
+ fprintf(file, "] ");
+}
+
+/**********************************************************************//**
Prints info of the aio arrays. */
UNIV_INTERN
void
@@ -4398,6 +5193,7 @@ os_aio_print(
os_aio_array_t* array;
os_aio_slot_t* slot;
ulint n_reserved;
+ ulint n_res_seg[SRV_MAX_N_IO_THREADS];
time_t current_time;
double time_elapsed;
double avg_bytes_read;
@@ -4430,11 +5226,17 @@ loop:
n_reserved = 0;
+ memset(n_res_seg, 0x0, sizeof(n_res_seg));
+
for (i = 0; i < array->n_slots; i++) {
+ ulint seg_no;
+
slot = os_aio_array_get_nth_slot(array, i);
+ seg_no = (i * array->n_segments) / array->n_slots;
if (slot->reserved) {
n_reserved++;
+ n_res_seg[seg_no]++;
#if 0
fprintf(stderr, "Reserved slot, messages %p %p\n",
(void*) slot->message1,
@@ -4448,6 +5250,8 @@ loop:
fprintf(file, " %lu", (ulong) n_reserved);
+ os_aio_print_segment_info(file, n_res_seg, array);
+
os_mutex_exit(array->mutex);
if (array == os_aio_read_array) {
diff --git a/storage/xtradb/os/os0proc.c b/storage/xtradb/os/os0proc.c
index 4567d96b6f4..0f56a608f38 100644
--- a/storage/xtradb/os/os0proc.c
+++ b/storage/xtradb/os/os0proc.c
@@ -145,7 +145,7 @@ skip:
os_fast_mutex_unlock(&ut_list_mutex);
UNIV_MEM_ALLOC(ptr, size);
}
-#elif defined __NETWARE__ || !defined OS_MAP_ANON
+#elif !defined OS_MAP_ANON
size = *n;
ptr = ut_malloc_low(size, TRUE, FALSE);
#else
@@ -213,7 +213,7 @@ os_mem_free_large(
os_fast_mutex_unlock(&ut_list_mutex);
UNIV_MEM_FREE(ptr, size);
}
-#elif defined __NETWARE__ || !defined OS_MAP_ANON
+#elif !defined OS_MAP_ANON
ut_free(ptr);
#else
if (munmap(ptr, size)) {
@@ -229,173 +229,3 @@ os_mem_free_large(
}
#endif
}
-
-/****************************************************************//**
-Allocates or attaches and reuses shared memory segment.
-The content is not cleared automatically.
-@return allocated memory */
-UNIV_INTERN
-void*
-os_shm_alloc(
-/*=========*/
- ulint* n, /*!< in/out: number of bytes */
- uint key,
- ibool* is_new)
-{
- void* ptr;
-#if defined HAVE_SYS_IPC_H && HAVE_SYS_SHM_H
- ulint size;
- int shmid;
-
- *is_new = FALSE;
- fprintf(stderr,
- "InnoDB: The shared memory segment containing the buffer pool is: key %#x (%d).\n",
- key, key);
-# if defined HAVE_LARGE_PAGES && defined UNIV_LINUX
- if (!os_use_large_pages || !os_large_page_size) {
- goto skip;
- }
-
- /* Align block size to os_large_page_size */
- ut_ad(ut_is_2pow(os_large_page_size));
- size = ut_2pow_round(*n + (os_large_page_size - 1),
- os_large_page_size);
-
- shmid = shmget((key_t)key, (size_t)size,
- IPC_CREAT | IPC_EXCL | SHM_HUGETLB | SHM_R | SHM_W);
- if (shmid < 0) {
- if (errno == EEXIST) {
- fprintf(stderr,
- "InnoDB: HugeTLB: The shared memory segment exists.\n");
- shmid = shmget((key_t)key, (size_t)size,
- SHM_HUGETLB | SHM_R | SHM_W);
- if (shmid < 0) {
- fprintf(stderr,
- "InnoDB: HugeTLB: Warning: Failed to allocate %lu bytes. (reuse) errno %d\n",
- size, errno);
- goto skip;
- } else {
- fprintf(stderr,
- "InnoDB: HugeTLB: The existent shared memory segment is used.\n");
- }
- } else {
- fprintf(stderr,
- "InnoDB: HugeTLB: Warning: Failed to allocate %lu bytes. (new) errno %d\n",
- size, errno);
- goto skip;
- }
- } else {
- *is_new = TRUE;
- fprintf(stderr,
- "InnoDB: HugeTLB: A new shared memory segment has been created .\n");
- }
-
- ptr = shmat(shmid, NULL, 0);
- if (ptr == (void *)-1) {
- fprintf(stderr,
- "InnoDB: HugeTLB: Warning: Failed to attach shared memory segment, errno %d\n",
- errno);
- ptr = NULL;
- }
-
- if (ptr) {
- *n = size;
- os_fast_mutex_lock(&ut_list_mutex);
- ut_total_allocated_memory += size;
- os_fast_mutex_unlock(&ut_list_mutex);
- UNIV_MEM_ALLOC(ptr, size);
- return(ptr);
- }
-skip:
- *is_new = FALSE;
-# endif /* HAVE_LARGE_PAGES && defined UNIV_LINUX */
-# ifdef HAVE_GETPAGESIZE
- size = getpagesize();
-# else
- size = UNIV_PAGE_SIZE;
-# endif
- /* Align block size to system page size */
- ut_ad(ut_is_2pow(size));
- size = *n = ut_2pow_round(*n + (size - 1), size);
-
- shmid = shmget((key_t)key, (size_t)size,
- IPC_CREAT | IPC_EXCL | SHM_R | SHM_W);
- if (shmid < 0) {
- if (errno == EEXIST) {
- fprintf(stderr,
- "InnoDB: A shared memory segment containing the buffer pool seems to already exist.\n");
- shmid = shmget((key_t)key, (size_t)size,
- SHM_R | SHM_W);
- if (shmid < 0) {
- fprintf(stderr,
- "InnoDB: Warning: Failed to allocate %lu bytes. (reuse) errno %d\n",
- size, errno);
- ptr = NULL;
- goto end;
- } else {
- fprintf(stderr,
- "InnoDB: The existent shared memory segment is used.\n");
- }
- } else {
- fprintf(stderr,
- "InnoDB: Warning: Failed to allocate %lu bytes. (new) errno %d\n",
- size, errno);
- ptr = NULL;
- goto end;
- }
- } else {
- *is_new = TRUE;
- fprintf(stderr,
- "InnoDB: A new shared memory segment has been created.\n");
- }
-
- ptr = shmat(shmid, NULL, 0);
- if (ptr == (void *)-1) {
- fprintf(stderr,
- "InnoDB: Warning: Failed to attach shared memory segment, errno %d\n",
- errno);
- ptr = NULL;
- }
-
- if (ptr) {
- *n = size;
- os_fast_mutex_lock(&ut_list_mutex);
- ut_total_allocated_memory += size;
- os_fast_mutex_unlock(&ut_list_mutex);
- UNIV_MEM_ALLOC(ptr, size);
- }
-end:
-#else /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */
- fprintf(stderr, "InnoDB: shared memory segment is not supported.\n");
- ptr = NULL;
-#endif /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */
- return(ptr);
-}
-
-/****************************************************************//**
-Detach shared memory segment. */
-UNIV_INTERN
-void
-os_shm_free(
-/*========*/
- void *ptr, /*!< in: pointer returned by
- os_shm_alloc() */
- ulint size) /*!< in: size returned by
- os_shm_alloc() */
-{
- os_fast_mutex_lock(&ut_list_mutex);
- ut_a(ut_total_allocated_memory >= size);
- os_fast_mutex_unlock(&ut_list_mutex);
-
-#if defined HAVE_SYS_IPC_H && HAVE_SYS_SHM_H
- if (!shmdt(ptr)) {
- os_fast_mutex_lock(&ut_list_mutex);
- ut_a(ut_total_allocated_memory >= size);
- ut_total_allocated_memory -= size;
- os_fast_mutex_unlock(&ut_list_mutex);
- UNIV_MEM_FREE(ptr, size);
- }
-#else /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */
- fprintf(stderr, "InnoDB: shared memory segment is not supported.\n");
-#endif /* HAVE_SYS_IPC_H && HAVE_SYS_SHM_H */
-}
diff --git a/storage/xtradb/os/os0sync.c b/storage/xtradb/os/os0sync.c
index f9ab58c2ee4..41a19843812 100644
--- a/storage/xtradb/os/os0sync.c
+++ b/storage/xtradb/os/os0sync.c
@@ -31,13 +31,11 @@ Created 9/6/1995 Heikki Tuuri
#ifdef __WIN__
#include <windows.h>
-#else
-#include <sys/time.h>
-#include <time.h>
#endif
#include "ut0mem.h"
#include "srv0start.h"
+#include "srv0srv.h"
/* Type definition for an operating system mutex struct */
struct os_mutex_struct{
@@ -74,11 +72,227 @@ UNIV_INTERN ulint os_event_count = 0;
UNIV_INTERN ulint os_mutex_count = 0;
UNIV_INTERN ulint os_fast_mutex_count = 0;
+/* The number of microsecnds in a second. */
+static const ulint MICROSECS_IN_A_SECOND = 1000000;
+
/* Because a mutex is embedded inside an event and there is an
event embedded inside a mutex, on free, this generates a recursive call.
This version of the free event function doesn't acquire the global lock */
static void os_event_free_internal(os_event_t event);
+/* On Windows (Vista and later), load function pointers for condition
+variable handling. Those functions are not available in prior versions,
+so we have to use them via runtime loading, as long as we support XP. */
+static void os_cond_module_init(void);
+
+#ifdef __WIN__
+/* Prototypes and function pointers for condition variable functions */
+typedef VOID (WINAPI* InitializeConditionVariableProc)
+ (PCONDITION_VARIABLE ConditionVariable);
+static InitializeConditionVariableProc initialize_condition_variable;
+
+typedef BOOL (WINAPI* SleepConditionVariableCSProc)
+ (PCONDITION_VARIABLE ConditionVariable,
+ PCRITICAL_SECTION CriticalSection,
+ DWORD dwMilliseconds);
+static SleepConditionVariableCSProc sleep_condition_variable;
+
+typedef VOID (WINAPI* WakeAllConditionVariableProc)
+ (PCONDITION_VARIABLE ConditionVariable);
+static WakeAllConditionVariableProc wake_all_condition_variable;
+
+typedef VOID (WINAPI* WakeConditionVariableProc)
+ (PCONDITION_VARIABLE ConditionVariable);
+static WakeConditionVariableProc wake_condition_variable;
+#endif
+
+/*********************************************************//**
+Initialitze condition variable */
+UNIV_INLINE
+void
+os_cond_init(
+/*=========*/
+ os_cond_t* cond) /*!< in: condition variable. */
+{
+ ut_a(cond);
+
+#ifdef __WIN__
+ ut_a(initialize_condition_variable != NULL);
+ initialize_condition_variable(cond);
+#else
+ ut_a(pthread_cond_init(cond, NULL) == 0);
+#endif
+}
+
+/*********************************************************//**
+Do a timed wait on condition variable.
+@return TRUE if timed out, FALSE otherwise */
+UNIV_INLINE
+ibool
+os_cond_wait_timed(
+/*===============*/
+ os_cond_t* cond, /*!< in: condition variable. */
+ os_fast_mutex_t* mutex, /*!< in: fast mutex */
+#ifndef __WIN__
+ const struct timespec* abstime /*!< in: timeout */
+#else
+ DWORD time_in_ms /*!< in: timeout in
+ milliseconds*/
+#endif /* !__WIN__ */
+)
+{
+#ifdef __WIN__
+ BOOL ret;
+ DWORD err;
+
+ ut_a(sleep_condition_variable != NULL);
+
+ ret = sleep_condition_variable(cond, mutex, time_in_ms);
+
+ if (!ret) {
+ err = GetLastError();
+ /* From http://msdn.microsoft.com/en-us/library/ms686301%28VS.85%29.aspx,
+ "Condition variables are subject to spurious wakeups
+ (those not associated with an explicit wake) and stolen wakeups
+ (another thread manages to run before the woken thread)."
+ Check for both types of timeouts.
+ Conditions are checked by the caller.*/
+ if ((err == WAIT_TIMEOUT) || (err == ERROR_TIMEOUT)) {
+ return(TRUE);
+ }
+ }
+
+ ut_a(ret);
+
+ return(FALSE);
+#else
+ int ret;
+
+ ret = pthread_cond_timedwait(cond, mutex, abstime);
+
+ switch (ret) {
+ case 0:
+ case ETIMEDOUT:
+ /* We play it safe by checking for EINTR even though
+ according to the POSIX documentation it can't return EINTR. */
+ case EINTR:
+ break;
+
+ default:
+ fprintf(stderr, " InnoDB: pthread_cond_timedwait() returned: "
+ "%d: abstime={%lu,%lu}\n",
+ ret, (ulong) abstime->tv_sec, (ulong) abstime->tv_nsec);
+ ut_error;
+ }
+
+ return(ret == ETIMEDOUT);
+#endif
+}
+/*********************************************************//**
+Wait on condition variable */
+UNIV_INLINE
+void
+os_cond_wait(
+/*=========*/
+ os_cond_t* cond, /*!< in: condition variable. */
+ os_fast_mutex_t* mutex) /*!< in: fast mutex */
+{
+ ut_a(cond);
+ ut_a(mutex);
+
+#ifdef __WIN__
+ ut_a(sleep_condition_variable != NULL);
+ ut_a(sleep_condition_variable(cond, mutex, INFINITE));
+#else
+ ut_a(pthread_cond_wait(cond, mutex) == 0);
+#endif
+}
+
+/*********************************************************//**
+Wakes all threads waiting for condition variable */
+UNIV_INLINE
+void
+os_cond_broadcast(
+/*==============*/
+ os_cond_t* cond) /*!< in: condition variable. */
+{
+ ut_a(cond);
+
+#ifdef __WIN__
+ ut_a(wake_all_condition_variable != NULL);
+ wake_all_condition_variable(cond);
+#else
+ ut_a(pthread_cond_broadcast(cond) == 0);
+#endif
+}
+
+/*********************************************************//**
+Wakes one thread waiting for condition variable */
+UNIV_INLINE
+void
+os_cond_signal(
+/*==========*/
+ os_cond_t* cond) /*!< in: condition variable. */
+{
+ ut_a(cond);
+
+#ifdef __WIN__
+ ut_a(wake_condition_variable != NULL);
+ wake_condition_variable(cond);
+#else
+ ut_a(pthread_cond_signal(cond) == 0);
+#endif
+}
+
+/*********************************************************//**
+Destroys condition variable */
+UNIV_INLINE
+void
+os_cond_destroy(
+/*============*/
+ os_cond_t* cond) /*!< in: condition variable. */
+{
+#ifdef __WIN__
+ /* Do nothing */
+#else
+ ut_a(pthread_cond_destroy(cond) == 0);
+#endif
+}
+
+/*********************************************************//**
+On Windows (Vista and later), load function pointers for condition variable
+handling. Those functions are not available in prior versions, so we have to
+use them via runtime loading, as long as we support XP. */
+static
+void
+os_cond_module_init(void)
+/*=====================*/
+{
+#ifdef __WIN__
+ HMODULE h_dll;
+
+ if (!srv_use_native_conditions)
+ return;
+
+ h_dll = GetModuleHandle("kernel32");
+
+ initialize_condition_variable = (InitializeConditionVariableProc)
+ GetProcAddress(h_dll, "InitializeConditionVariable");
+ sleep_condition_variable = (SleepConditionVariableCSProc)
+ GetProcAddress(h_dll, "SleepConditionVariableCS");
+ wake_all_condition_variable = (WakeAllConditionVariableProc)
+ GetProcAddress(h_dll, "WakeAllConditionVariable");
+ wake_condition_variable = (WakeConditionVariableProc)
+ GetProcAddress(h_dll, "WakeConditionVariable");
+
+ /* When using native condition variables, check function pointers */
+ ut_a(initialize_condition_variable);
+ ut_a(sleep_condition_variable);
+ ut_a(wake_all_condition_variable);
+ ut_a(wake_condition_variable);
+#endif
+}
+
/*********************************************************//**
Initializes global event and OS 'slow' mutex lists. */
UNIV_INTERN
@@ -92,7 +306,10 @@ os_sync_init(void)
os_sync_mutex = NULL;
os_sync_mutex_inited = FALSE;
- os_sync_mutex = os_mutex_create(NULL);
+ /* Now for Windows only */
+ os_cond_module_init();
+
+ os_sync_mutex = os_mutex_create();
os_sync_mutex_inited = TRUE;
}
@@ -146,42 +363,45 @@ os_event_create(
const char* name) /*!< in: the name of the event, if NULL
the event is created without a name */
{
-#ifdef __WIN__
- os_event_t event;
-
- event = ut_malloc(sizeof(struct os_event_struct));
-
- event->handle = CreateEvent(NULL, /* No security attributes */
- TRUE, /* Manual reset */
- FALSE, /* Initial state nonsignaled */
- (LPCTSTR) name);
- if (!event->handle) {
- fprintf(stderr,
- "InnoDB: Could not create a Windows event semaphore;"
- " Windows error %lu\n",
- (ulong) GetLastError());
- }
-#else /* Unix */
os_event_t event;
- UT_NOT_USED(name);
+#ifdef __WIN__
+ if(!srv_use_native_conditions) {
+
+ event = ut_malloc(sizeof(struct os_event_struct));
+
+ event->handle = CreateEvent(NULL,
+ TRUE,
+ FALSE,
+ (LPCTSTR) name);
+ if (!event->handle) {
+ fprintf(stderr,
+ "InnoDB: Could not create a Windows event"
+ " semaphore; Windows error %lu\n",
+ (ulong) GetLastError());
+ }
+ } else /* Windows with condition variables */
+#endif
+
+ {
+ UT_NOT_USED(name);
- event = ut_malloc(sizeof(struct os_event_struct));
+ event = ut_malloc(sizeof(struct os_event_struct));
- os_fast_mutex_init(&(event->os_mutex));
+ os_fast_mutex_init(&(event->os_mutex));
- ut_a(0 == pthread_cond_init(&(event->cond_var), NULL));
+ os_cond_init(&(event->cond_var));
- event->is_set = FALSE;
+ event->is_set = FALSE;
- /* We return this value in os_event_reset(), which can then be
- be used to pass to the os_event_wait_low(). The value of zero
- is reserved in os_event_wait_low() for the case when the
- caller does not want to pass any signal_count value. To
- distinguish between the two cases we initialize signal_count
- to 1 here. */
- event->signal_count = 1;
-#endif /* __WIN__ */
+ /* We return this value in os_event_reset(), which can then be
+ be used to pass to the os_event_wait_low(). The value of zero
+ is reserved in os_event_wait_low() for the case when the
+ caller does not want to pass any signal_count value. To
+ distinguish between the two cases we initialize signal_count
+ to 1 here. */
+ event->signal_count = 1;
+ }
/* The os_sync_mutex can be NULL because during startup an event
can be created [ because it's embedded in the mutex/rwlock ] before
@@ -211,10 +431,15 @@ os_event_set(
/*=========*/
os_event_t event) /*!< in: event to set */
{
-#ifdef __WIN__
ut_a(event);
- ut_a(SetEvent(event->handle));
-#else
+
+#ifdef __WIN__
+ if (!srv_use_native_conditions) {
+ ut_a(SetEvent(event->handle));
+ return;
+ }
+#endif
+
ut_a(event);
os_fast_mutex_lock(&(event->os_mutex));
@@ -224,11 +449,10 @@ os_event_set(
} else {
event->is_set = TRUE;
event->signal_count += 1;
- ut_a(0 == pthread_cond_broadcast(&(event->cond_var)));
+ os_cond_broadcast(&(event->cond_var));
}
os_fast_mutex_unlock(&(event->os_mutex));
-#endif
}
/**********************************************************//**
@@ -247,12 +471,14 @@ os_event_reset(
{
ib_int64_t ret = 0;
-#ifdef __WIN__
ut_a(event);
- ut_a(ResetEvent(event->handle));
-#else
- ut_a(event);
+#ifdef __WIN__
+ if(!srv_use_native_conditions) {
+ ut_a(ResetEvent(event->handle));
+ return(0);
+ }
+#endif
os_fast_mutex_lock(&(event->os_mutex));
@@ -264,7 +490,6 @@ os_event_reset(
ret = event->signal_count;
os_fast_mutex_unlock(&(event->os_mutex));
-#endif
return(ret);
}
@@ -277,19 +502,21 @@ os_event_free_internal(
os_event_t event) /*!< in: event to free */
{
#ifdef __WIN__
- ut_a(event);
+ if(!srv_use_native_conditions) {
+ ut_a(event);
+ ut_a(CloseHandle(event->handle));
+ } else
+#endif
+ {
+ ut_a(event);
- ut_a(CloseHandle(event->handle));
-#else
- ut_a(event);
+ /* This is to avoid freeing the mutex twice */
+ os_fast_mutex_free(&(event->os_mutex));
- /* This is to avoid freeing the mutex twice */
- os_fast_mutex_free(&(event->os_mutex));
+ os_cond_destroy(&(event->cond_var));
+ }
- ut_a(0 == pthread_cond_destroy(&(event->cond_var)));
-#endif
/* Remove from the list of events */
-
UT_LIST_REMOVE(os_event_list, os_event_list, event);
os_event_count--;
@@ -306,18 +533,19 @@ os_event_free(
os_event_t event) /*!< in: event to free */
{
-#ifdef __WIN__
ut_a(event);
+#ifdef __WIN__
+ if(!srv_use_native_conditions){
+ ut_a(CloseHandle(event->handle));
+ } else /*Windows with condition variables */
+#endif
+ {
+ os_fast_mutex_free(&(event->os_mutex));
- ut_a(CloseHandle(event->handle));
-#else
- ut_a(event);
+ os_cond_destroy(&(event->cond_var));
+ }
- os_fast_mutex_free(&(event->os_mutex));
- ut_a(0 == pthread_cond_destroy(&(event->cond_var)));
-#endif
/* Remove from the list of events */
-
os_mutex_enter(os_sync_mutex);
UT_LIST_REMOVE(os_event_list, os_event_list, event);
@@ -330,10 +558,7 @@ os_event_free(
}
/**********************************************************//**
-Waits for an event object until it is in the signaled state. If
-srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS this also exits the
-waiting thread when the event becomes signaled (or immediately if the
-event is already in the signaled state).
+Waits for an event object until it is in the signaled state.
Typically, if the event has been signalled after the os_event_reset()
we'll return immediately because event->is_set == TRUE.
@@ -359,53 +584,36 @@ os_event_wait_low(
os_event_reset(). */
{
#ifdef __WIN__
- DWORD err;
-
- ut_a(event);
+ if(!srv_use_native_conditions) {
+ DWORD err;
- UT_NOT_USED(reset_sig_count);
+ ut_a(event);
- /* Specify an infinite time limit for waiting */
- err = WaitForSingleObject(event->handle, INFINITE);
+ UT_NOT_USED(reset_sig_count);
- ut_a(err == WAIT_OBJECT_0);
+ /* Specify an infinite wait */
+ err = WaitForSingleObject(event->handle, INFINITE);
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
- os_thread_exit(NULL);
+ ut_a(err == WAIT_OBJECT_0);
+ return;
}
-#else
- ib_int64_t old_signal_count;
+#endif
- os_fast_mutex_lock(&(event->os_mutex));
+ os_fast_mutex_lock(&event->os_mutex);
- if (reset_sig_count) {
- old_signal_count = reset_sig_count;
- } else {
- old_signal_count = event->signal_count;
+ if (!reset_sig_count) {
+ reset_sig_count = event->signal_count;
}
- for (;;) {
- if (event->is_set == TRUE
- || event->signal_count != old_signal_count) {
-
- os_fast_mutex_unlock(&(event->os_mutex));
-
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
-
- os_thread_exit(NULL);
- }
- /* Ok, we may return */
-
- return;
- }
-
- pthread_cond_wait(&(event->cond_var), &(event->os_mutex));
+ while (!event->is_set && event->signal_count == reset_sig_count) {
+ os_cond_wait(&(event->cond_var), &(event->os_mutex));
/* Solaris manual said that spurious wakeups may occur: we
have to check if the event really has been signaled after
we came here to wait */
}
-#endif
+
+ os_fast_mutex_unlock(&event->os_mutex);
}
/**********************************************************//**
@@ -414,112 +622,112 @@ a timeout is exceeded.
@return 0 if success, OS_SYNC_TIME_EXCEEDED if timeout was exceeded */
UNIV_INTERN
ulint
-os_event_wait_time(
-/*===============*/
- os_event_t event, /*!< in: event to wait */
- ulint wtime) /*!< in: timeout in microseconds, or
- OS_SYNC_INFINITE_TIME */
+os_event_wait_time_low(
+/*===================*/
+ os_event_t event, /*!< in: event to wait */
+ ulint time_in_usec, /*!< in: timeout in
+ microseconds, or
+ OS_SYNC_INFINITE_TIME */
+ ib_int64_t reset_sig_count) /*!< in: zero or the value
+ returned by previous call of
+ os_event_reset(). */
+
{
+ ibool timed_out = FALSE;
+
#ifdef __WIN__
- DWORD err;
+ DWORD time_in_ms;
- ut_a(event);
+ if (!srv_use_native_conditions) {
+ DWORD err;
- if (wtime != OS_SYNC_INFINITE_TIME) {
- err = WaitForSingleObject(event->handle, (DWORD) wtime / 1000);
- } else {
- err = WaitForSingleObject(event->handle, INFINITE);
- }
+ ut_a(event);
- if (err == WAIT_OBJECT_0) {
+ if (time_in_usec != OS_SYNC_INFINITE_TIME) {
+ time_in_ms = time_in_usec / 1000;
+ err = WaitForSingleObject(event->handle, time_in_ms);
+ } else {
+ err = WaitForSingleObject(event->handle, INFINITE);
+ }
- return(0);
- } else if (err == WAIT_TIMEOUT) {
+ if (err == WAIT_OBJECT_0) {
+ return(0);
+ } else if ((err == WAIT_TIMEOUT) || (err == ERROR_TIMEOUT)) {
+ return(OS_SYNC_TIME_EXCEEDED);
+ }
- return(OS_SYNC_TIME_EXCEEDED);
- } else {
ut_error;
- return(1000000); /* dummy value to eliminate compiler warn. */
+ /* Dummy value to eliminate compiler warning. */
+ return(42);
+ } else {
+ ut_a(sleep_condition_variable != NULL);
+
+ if (time_in_usec != OS_SYNC_INFINITE_TIME) {
+ time_in_ms = time_in_usec / 1000;
+ } else {
+ time_in_ms = INFINITE;
+ }
}
#else
- int err;
- int ret = 0;
- ulint tmp;
- ib_int64_t old_count;
- struct timeval tv_start;
- struct timespec timeout;
-
- if (wtime == OS_SYNC_INFINITE_TIME) {
- os_event_wait(event);
- return 0;
- }
+ struct timespec abstime;
- /* Compute the absolute point in time at which to time out. */
- gettimeofday(&tv_start, NULL);
- tmp = tv_start.tv_usec + wtime;
- timeout.tv_sec = tv_start.tv_sec + (tmp / 1000000);
- timeout.tv_nsec = (tmp % 1000000) * 1000;
+ if (time_in_usec != OS_SYNC_INFINITE_TIME) {
+ struct timeval tv;
+ int ret;
+ ulint sec;
+ ulint usec;
- os_fast_mutex_lock(&(event->os_mutex));
- old_count = event->signal_count;
+ ret = ut_usectime(&sec, &usec);
+ ut_a(ret == 0);
- for (;;) {
- if (event->is_set == TRUE || event->signal_count != old_count)
- break;
+ tv.tv_sec = sec;
+ tv.tv_usec = usec;
- err = pthread_cond_timedwait(&(event->cond_var),
- &(event->os_mutex), &timeout);
- if (err == ETIMEDOUT) {
- ret = OS_SYNC_TIME_EXCEEDED;
- break;
+ tv.tv_usec += time_in_usec;
+
+ if ((ulint) tv.tv_usec >= MICROSECS_IN_A_SECOND) {
+ tv.tv_sec += time_in_usec / MICROSECS_IN_A_SECOND;
+ tv.tv_usec %= MICROSECS_IN_A_SECOND;
}
+
+ abstime.tv_sec = tv.tv_sec;
+ abstime.tv_nsec = tv.tv_usec * 1000;
+ } else {
+ abstime.tv_nsec = 999999999;
+ abstime.tv_sec = (time_t) ULINT_MAX;
}
- os_fast_mutex_unlock(&(event->os_mutex));
+ ut_a(abstime.tv_nsec <= 999999999);
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
+#endif /* __WIN__ */
+
+ os_fast_mutex_lock(&event->os_mutex);
- os_thread_exit(NULL);
+ if (!reset_sig_count) {
+ reset_sig_count = event->signal_count;
}
- return ret;
-#endif
-}
+ do {
+ if (event->is_set || event->signal_count != reset_sig_count) {
-#ifdef __WIN__
-/**********************************************************//**
-Waits for any event in an OS native event array. Returns if even a single
-one is signaled or becomes signaled.
-@return index of the event which was signaled */
-UNIV_INTERN
-ulint
-os_event_wait_multiple(
-/*===================*/
- ulint n, /*!< in: number of events in the
- array */
- os_native_event_t* native_event_array)
- /*!< in: pointer to an array of event
- handles */
-{
- DWORD index;
+ break;
+ }
- ut_a(native_event_array);
- ut_a(n > 0);
+ timed_out = os_cond_wait_timed(
+ &event->cond_var, &event->os_mutex,
+#ifndef __WIN__
+ &abstime
+#else
+ time_in_ms
+#endif /* !__WIN__ */
+ );
- index = WaitForMultipleObjects((DWORD) n, native_event_array,
- FALSE, /* Wait for any 1 event */
- INFINITE); /* Infinite wait time
- limit */
- ut_a(index >= WAIT_OBJECT_0); /* NOTE: Pointless comparison */
- ut_a(index < WAIT_OBJECT_0 + n);
+ } while (!timed_out);
- if (srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS) {
- os_thread_exit(NULL);
- }
+ os_fast_mutex_unlock(&event->os_mutex);
- return(index - WAIT_OBJECT_0);
+ return(timed_out ? OS_SYNC_TIME_EXCEEDED : 0);
}
-#endif
/*********************************************************//**
Creates an operating system mutex semaphore. Because these are slow, the
@@ -527,29 +735,15 @@ mutex semaphore of InnoDB itself (mutex_t) should be used where possible.
@return the mutex handle */
UNIV_INTERN
os_mutex_t
-os_mutex_create(
-/*============*/
- const char* name) /*!< in: the name of the mutex, if NULL
- the mutex is created without a name */
+os_mutex_create(void)
+/*=================*/
{
-#ifdef __WIN__
- HANDLE mutex;
- os_mutex_t mutex_str;
-
- mutex = CreateMutex(NULL, /* No security attributes */
- FALSE, /* Initial state: no owner */
- (LPCTSTR) name);
- ut_a(mutex);
-#else
os_fast_mutex_t* mutex;
os_mutex_t mutex_str;
- UT_NOT_USED(name);
-
mutex = ut_malloc(sizeof(os_fast_mutex_t));
os_fast_mutex_init(mutex);
-#endif
mutex_str = ut_malloc(sizeof(os_mutex_str_t));
mutex_str->handle = mutex;
@@ -580,25 +774,11 @@ os_mutex_enter(
/*===========*/
os_mutex_t mutex) /*!< in: mutex to acquire */
{
-#ifdef __WIN__
- DWORD err;
-
- ut_a(mutex);
-
- /* Specify infinite time limit for waiting */
- err = WaitForSingleObject(mutex->handle, INFINITE);
-
- ut_a(err == WAIT_OBJECT_0);
-
- (mutex->count)++;
- ut_a(mutex->count == 1);
-#else
os_fast_mutex_lock(mutex->handle);
(mutex->count)++;
ut_a(mutex->count == 1);
-#endif
}
/**********************************************************//**
@@ -614,11 +794,7 @@ os_mutex_exit(
ut_a(mutex->count == 1);
(mutex->count)--;
-#ifdef __WIN__
- ut_a(ReleaseMutex(mutex->handle));
-#else
os_fast_mutex_unlock(mutex->handle);
-#endif
}
/**********************************************************//**
@@ -647,15 +823,9 @@ os_mutex_free(
os_mutex_exit(os_sync_mutex);
}
-#ifdef __WIN__
- ut_a(CloseHandle(mutex->handle));
-
- ut_free(mutex);
-#else
os_fast_mutex_free(mutex->handle);
ut_free(mutex->handle);
ut_free(mutex);
-#endif
}
/*********************************************************//**
diff --git a/storage/xtradb/os/os0thread.c b/storage/xtradb/os/os0thread.c
index e41c1163371..12b6805d98e 100644
--- a/storage/xtradb/os/os0thread.c
+++ b/storage/xtradb/os/os0thread.c
@@ -1,6 +1,6 @@
/*****************************************************************************
-Copyright (c) 1995, 2009, Innobase Oy. All Rights Reserved.
+Copyright (c) 1995, 2010, Innobase Oy. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -133,15 +133,6 @@ os_thread_create(
0, /* thread runs immediately */
&win_thread_id);
- if (srv_set_thread_priorities) {
-
- /* Set created thread priority the same as a normal query
- in MYSQL: we try to prevent starvation of threads by
- assigning same priority QUERY_PRIOR to all */
-
- ut_a(SetThreadPriority(thread, srv_query_thread_priority));
- }
-
if (thread_id) {
*thread_id = win_thread_id;
}
@@ -172,16 +163,6 @@ os_thread_create(
exit(1);
}
#endif
-#ifdef __NETWARE__
- ret = pthread_attr_setstacksize(&attr,
- (size_t) NW_THD_STACKSIZE);
- if (ret) {
- fprintf(stderr,
- "InnoDB: Error: pthread_attr_setstacksize"
- " returned %d\n", ret);
- exit(1);
- }
-#endif
os_mutex_enter(os_sync_mutex);
os_thread_count++;
os_mutex_exit(os_sync_mutex);
@@ -200,7 +181,6 @@ os_thread_create(
#ifndef UNIV_HPUX10
pthread_attr_destroy(&attr);
#endif
-
if (thread_id) {
*thread_id = pthread;
}
@@ -222,6 +202,11 @@ os_thread_exit(
fprintf(stderr, "Thread exits, id %lu\n",
os_thread_pf(os_thread_get_curr_id()));
#endif
+
+#ifdef UNIV_PFS_THREAD
+ pfs_delete_thread();
+#endif
+
os_mutex_enter(os_sync_mutex);
os_thread_count--;
os_mutex_exit(os_sync_mutex);
@@ -235,21 +220,6 @@ os_thread_exit(
}
/*****************************************************************//**
-Returns handle to the current thread.
-@return current thread handle */
-UNIV_INTERN
-os_thread_t
-os_thread_get_curr(void)
-/*====================*/
-{
-#ifdef __WIN__
- return(GetCurrentThread());
-#else
- return(pthread_self());
-#endif
-}
-
-/*****************************************************************//**
Advises the os to give up remainder of the thread's time slice. */
UNIV_INTERN
void
@@ -257,7 +227,7 @@ os_thread_yield(void)
/*=================*/
{
#if defined(__WIN__)
- Sleep(0);
+ SwitchToThread();
#elif (defined(HAVE_SCHED_YIELD) && defined(HAVE_SCHED_H))
sched_yield();
#elif defined(HAVE_PTHREAD_YIELD_ZERO_ARG)
@@ -280,8 +250,6 @@ os_thread_sleep(
{
#ifdef __WIN__
Sleep((DWORD) tm / 1000);
-#elif defined(__NETWARE__)
- delay(tm / 1000);
#else
struct timeval t;
@@ -291,81 +259,3 @@ os_thread_sleep(
select(0, NULL, NULL, NULL, &t);
#endif
}
-
-#ifndef UNIV_HOTBACKUP
-/******************************************************************//**
-Sets a thread priority. */
-UNIV_INTERN
-void
-os_thread_set_priority(
-/*===================*/
- os_thread_t handle, /*!< in: OS handle to the thread */
- ulint pri) /*!< in: priority */
-{
-#ifdef __WIN__
- int os_pri;
-
- if (pri == OS_THREAD_PRIORITY_BACKGROUND) {
- os_pri = THREAD_PRIORITY_BELOW_NORMAL;
- } else if (pri == OS_THREAD_PRIORITY_NORMAL) {
- os_pri = THREAD_PRIORITY_NORMAL;
- } else if (pri == OS_THREAD_PRIORITY_ABOVE_NORMAL) {
- os_pri = THREAD_PRIORITY_HIGHEST;
- } else {
- ut_error;
- }
-
- ut_a(SetThreadPriority(handle, os_pri));
-#else
- UT_NOT_USED(handle);
- UT_NOT_USED(pri);
-#endif
-}
-
-/******************************************************************//**
-Gets a thread priority.
-@return priority */
-UNIV_INTERN
-ulint
-os_thread_get_priority(
-/*===================*/
- os_thread_t handle __attribute__((unused)))
- /*!< in: OS handle to the thread */
-{
-#ifdef __WIN__
- int os_pri;
- ulint pri;
-
- os_pri = GetThreadPriority(handle);
-
- if (os_pri == THREAD_PRIORITY_BELOW_NORMAL) {
- pri = OS_THREAD_PRIORITY_BACKGROUND;
- } else if (os_pri == THREAD_PRIORITY_NORMAL) {
- pri = OS_THREAD_PRIORITY_NORMAL;
- } else if (os_pri == THREAD_PRIORITY_HIGHEST) {
- pri = OS_THREAD_PRIORITY_ABOVE_NORMAL;
- } else {
- ut_error;
- }
-
- return(pri);
-#else
- return(0);
-#endif
-}
-
-/******************************************************************//**
-Gets the last operating system error code for the calling thread.
-@return last error on Windows, 0 otherwise */
-UNIV_INTERN
-ulint
-os_thread_get_last_error(void)
-/*==========================*/
-{
-#ifdef __WIN__
- return(GetLastError());
-#else
- return(0);
-#endif
-}
-#endif /* !UNIV_HOTBACKUP */