diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/portability/file.cc')
-rw-r--r-- | storage/tokudb/PerconaFT/portability/file.cc | 498 |
1 files changed, 397 insertions, 101 deletions
diff --git a/storage/tokudb/PerconaFT/portability/file.cc b/storage/tokudb/PerconaFT/portability/file.cc index 0e3efc1a12a..485bfac8514 100644 --- a/storage/tokudb/PerconaFT/portability/file.cc +++ b/storage/tokudb/PerconaFT/portability/file.cc @@ -52,9 +52,12 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "toku_path.h" #include <portability/toku_atomic.h> +toku_instr_key *tokudb_file_data_key; + static int toku_assert_on_write_enospc = 0; static const int toku_write_enospc_sleep = 1; -static uint64_t toku_write_enospc_last_report; // timestamp of most recent report to error log +static uint64_t toku_write_enospc_last_report; // timestamp of most recent + // report to error log static time_t toku_write_enospc_last_time; // timestamp of most recent ENOSPC static uint32_t toku_write_enospc_current; // number of threads currently blocked on ENOSPC static uint64_t toku_write_enospc_total; // total number of times ENOSPC was returned from an attempt to write @@ -142,11 +145,17 @@ static ssize_t (*t_full_pwrite)(int, const void *, size_t, off_t); static FILE * (*t_fdopen)(int, const char *); static FILE * (*t_fopen)(const char *, const char *); static int (*t_open)(const char *, int, int); -static int (*t_fclose)(FILE *); +static int (*t_fclose)(FILE *); static ssize_t (*t_read)(int, void *, size_t); static ssize_t (*t_pread)(int, void *, size_t, off_t); +static size_t (*os_fwrite_fun)(const void *, size_t, size_t, FILE *) = nullptr; + +void toku_set_func_fwrite( + size_t (*fwrite_fun)(const void *, size_t, size_t, FILE *)) { + os_fwrite_fun = fwrite_fun; +} -void toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) { +void toku_set_func_write(ssize_t (*write_fun)(int, const void *, size_t)) { t_write = write_fun; } @@ -186,9 +195,63 @@ void toku_set_func_pread (ssize_t (*pread_fun)(int, void *, size_t, off_t)) { t_pread = pread_fun; } -void -toku_os_full_write (int fd, const void *buf, size_t len) { - const char *bp = (const char *) buf; +int toku_os_delete_with_source_location(const char *name, + const char *src_file, + uint src_line) { + + toku_io_instrumentation io_annotation; + toku_instr_file_name_close_begin(io_annotation, + *tokudb_file_data_key, + toku_instr_file_op::file_delete, + name, + src_file, + src_line); + const int result = unlink(name); + + /* Register the result value with the instrumentation system */ + toku_instr_file_close_end(io_annotation, result); + + return result; +} + +int toku_os_rename_with_source_location(const char *old_name, + const char *new_name, + const char *src_file, + uint src_line) { + int result; + + toku_io_instrumentation io_annotation; + toku_instr_file_name_io_begin(io_annotation, + *tokudb_file_data_key, + toku_instr_file_op::file_rename, + new_name, + 0, + src_file, + src_line); + + result = rename(old_name, new_name); + /* Regsiter the result value with the instrumentation system */ + toku_instr_file_io_end(io_annotation, 0); + + return result; +} + +void toku_os_full_write_with_source_location(int fd, + const void *buf, + size_t len, + const char *src_file, + uint src_line) { + const char *bp = (const char *)buf; + size_t bytes_written = len; + + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_write, + fd, + len, + src_file, + src_line); + while (len > 0) { ssize_t r; if (t_full_write) { @@ -205,14 +268,30 @@ toku_os_full_write (int fd, const void *buf, size_t len) { } } assert(len == 0); + + /* Register the result value with the instrumentaion system */ + toku_instr_file_io_end(io_annotation, bytes_written); } -int -toku_os_write (int fd, const void *buf, size_t len) { - const char *bp = (const char *) buf; +int toku_os_write_with_source_location(int fd, + const void *buf, + size_t len, + const char *src_file, + uint src_line) { + const char *bp = (const char *)buf; int result = 0; + ssize_t r; + + size_t bytes_written = len; + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_write, + fd, + len, + src_file, + src_line); + while (len > 0) { - ssize_t r; if (t_write) { r = t_write(fd, bp, len); } else { @@ -222,17 +301,33 @@ toku_os_write (int fd, const void *buf, size_t len) { result = errno; break; } - len -= r; - bp += r; + len -= r; + bp += r; } + /* Register the result value with the instrumentation system */ + toku_instr_file_io_end(io_annotation, bytes_written - len); + return result; } -void -toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) { - assert(0==((long long)buf)%512); - assert((len%512 == 0) && (off%512)==0); // to make pwrite work. - const char *bp = (const char *) buf; +void toku_os_full_pwrite_with_source_location(int fd, + const void *buf, + size_t len, + toku_off_t off, + const char *src_file, + uint src_line) { + assert(0 == ((long long)buf) % 512); + assert((len % 512 == 0) && (off % 512) == 0); // to make pwrite work. + const char *bp = (const char *)buf; + + size_t bytes_written = len; + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_write, + fd, + len, + src_file, + src_line); while (len > 0) { ssize_t r; if (t_full_pwrite) { @@ -250,71 +345,209 @@ toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) { } } assert(len == 0); -} -ssize_t -toku_os_pwrite (int fd, const void *buf, size_t len, toku_off_t off) { - assert(0==((long long)buf)%512); // these asserts are to ensure that direct I/O will work. - assert(0==len %512); - assert(0==off %512); - const char *bp = (const char *) buf; + /* Register the result value with the instrumentation system */ + toku_instr_file_io_end(io_annotation, bytes_written); +} + +ssize_t toku_os_pwrite_with_source_location(int fd, + const void *buf, + size_t len, + toku_off_t off, + const char *src_file, + uint src_line) { + assert(0 == + ((long long)buf) % + 512); // these asserts are to ensure that direct I/O will work. + assert(0 == len % 512); + assert(0 == off % 512); + const char *bp = (const char *)buf; ssize_t result = 0; + ssize_t r; + + size_t bytes_written = len; + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_write, + fd, + len, + src_file, + src_line); while (len > 0) { - ssize_t r; - if (t_pwrite) { - r = t_pwrite(fd, bp, len, off); - } else { - r = pwrite(fd, bp, len, off); - } + r = (t_pwrite) ? t_pwrite(fd, bp, len, off) : pwrite(fd, bp, len, off); + if (r < 0) { result = errno; break; } len -= r; - bp += r; - off += r; + bp += r; + off += r; + } + /* Register the result value with the instrumentation system */ + toku_instr_file_io_end(io_annotation, bytes_written - len); + + return result; +} + +int toku_os_fwrite_with_source_location(const void *ptr, + size_t size, + size_t nmemb, + TOKU_FILE *stream, + const char *src_file, + uint src_line) { + int result = 0; + size_t bytes_written; + + toku_io_instrumentation io_annotation; + toku_instr_file_stream_io_begin(io_annotation, + toku_instr_file_op::file_write, + *stream, + nmemb, + src_file, + src_line); + + if (os_fwrite_fun) { + bytes_written = os_fwrite_fun(ptr, size, nmemb, stream->file); + } else { + bytes_written = fwrite(ptr, size, nmemb, stream->file); + } + + if (bytes_written != nmemb) { + if (os_fwrite_fun) // if using hook to induce artificial errors (for + // testing) ... + result = get_maybe_error_errno(); // ... then there is no error in + // the stream, but there is one + // in errno + else + result = ferror(stream->file); + invariant(result != 0); // Should we assert here? + } + /* Register the result value with the instrumentation system */ + toku_instr_file_io_end(io_annotation, bytes_written); + + return result; +} + +int toku_os_fread_with_source_location(void *ptr, + size_t size, + size_t nmemb, + TOKU_FILE *stream, + const char *src_file, + uint src_line) { + int result = 0; + size_t bytes_read; + + toku_io_instrumentation io_annotation; + toku_instr_file_stream_io_begin(io_annotation, + toku_instr_file_op::file_read, + *stream, + nmemb, + src_file, + src_line); + + if ((bytes_read = fread(ptr, size, nmemb, stream->file)) != nmemb) { + if ((feof(stream->file))) + result = EOF; + else + result = ferror(stream->file); + invariant(result != 0); // Should we assert here? } + /* Register the result value with the instrumentation system */ + toku_instr_file_io_end(io_annotation, bytes_read); + return result; } -FILE * -toku_os_fdopen(int fildes, const char *mode) { - FILE * rval; - if (t_fdopen) - rval = t_fdopen(fildes, mode); - else - rval = fdopen(fildes, mode); +TOKU_FILE *toku_os_fdopen_with_source_location(int fildes, + const char *mode, + const char *filename, + const toku_instr_key &instr_key, + const char *src_file, + uint src_line) { + TOKU_FILE *XMALLOC(rval); + if (FT_LIKELY(rval != nullptr)) { + toku_io_instrumentation io_annotation; + toku_instr_file_open_begin(io_annotation, + instr_key, + toku_instr_file_op::file_stream_open, + filename, + src_file, + src_line); + + rval->file = (t_fdopen) ? t_fdopen(fildes, mode) : fdopen(fildes, mode); + toku_instr_file_stream_open_end(io_annotation, *rval); + + if (FT_UNLIKELY(rval->file == nullptr)) { + toku_free(rval); + rval = nullptr; + } + } return rval; } - -FILE * -toku_os_fopen(const char *filename, const char *mode){ - FILE * rval; - if (t_fopen) - rval = t_fopen(filename, mode); - else - rval = fopen(filename, mode); +TOKU_FILE *toku_os_fopen_with_source_location(const char *filename, + const char *mode, + const toku_instr_key &instr_key, + const char *src_file, + uint src_line) { + TOKU_FILE *XMALLOC(rval); + if (FT_UNLIKELY(rval == nullptr)) + return nullptr; + + toku_io_instrumentation io_annotation; + toku_instr_file_open_begin(io_annotation, + instr_key, + toku_instr_file_op::file_stream_open, + filename, + src_file, + src_line); + rval->file = t_fopen ? t_fopen(filename, mode) : fopen(filename, mode); + /* Register the returning "file" value with the system */ + toku_instr_file_stream_open_end(io_annotation, *rval); + + if (FT_UNLIKELY(rval->file == nullptr)) { + toku_free(rval); + rval = nullptr; + } return rval; } -int -toku_os_open(const char *path, int oflag, int mode) { - int rval; +int toku_os_open_with_source_location(const char *path, + int oflag, + int mode, + const toku_instr_key &instr_key, + const char *src_file, + uint src_line) { + int fd; + toku_io_instrumentation io_annotation; + /* register a file open or creation depending on "oflag" */ + toku_instr_file_open_begin( + io_annotation, + instr_key, + ((oflag & O_CREAT) ? toku_instr_file_op::file_create + : toku_instr_file_op::file_open), + path, + src_file, + src_line); if (t_open) - rval = t_open(path, oflag, mode); + fd = t_open(path, oflag, mode); else - rval = open(path, oflag, mode); - return rval; + fd = open(path, oflag, mode); + + toku_instr_file_open_end(io_annotation, fd); + return fd; } -int -toku_os_open_direct(const char *path, int oflag, int mode) { +int toku_os_open_direct(const char *path, + int oflag, + int mode, + const toku_instr_key &instr_key) { int rval; #if defined(HAVE_O_DIRECT) - rval = toku_os_open(path, oflag | O_DIRECT, mode); + rval = toku_os_open(path, oflag | O_DIRECT, mode, instr_key); #elif defined(HAVE_F_NOCACHE) - rval = toku_os_open(path, oflag, mode); + rval = toku_os_open(path, oflag, mode, instr_key); if (rval >= 0) { int r = fcntl(rval, F_NOCACHE, 1); if (r == -1) { @@ -327,63 +560,112 @@ toku_os_open_direct(const char *path, int oflag, int mode) { return rval; } -int -toku_os_fclose(FILE * stream) { +int toku_os_fclose_with_source_location(TOKU_FILE *stream, + const char *src_file, + uint src_line) { int rval = -1; - if (t_fclose) - rval = t_fclose(stream); - else { // if EINTR, retry until success - while (rval != 0) { - rval = fclose(stream); - if (rval && (errno != EINTR)) - break; - } + if (FT_LIKELY(stream != nullptr)) { + /* register a file stream close " */ + toku_io_instrumentation io_annotation; + toku_instr_file_stream_close_begin( + io_annotation, + toku_instr_file_op::file_stream_close, + *stream, + src_file, + src_line); + + if (t_fclose) + rval = t_fclose(stream->file); + else { // if EINTR, retry until success + while (rval != 0) { + rval = fclose(stream->file); + if (rval && (errno != EINTR)) + break; + } + } + /* Register the returning "rval" value with the system */ + toku_instr_file_close_end(io_annotation, rval); + toku_free(stream); + stream = nullptr; } return rval; } -int -toku_os_close(int fd) { // if EINTR, retry until success +int toku_os_close_with_source_location( + int fd, + const char *src_file, + uint src_line) { // if EINTR, retry until success + /* register the file close */ int r = -1; + + /* register a file descriptor close " */ + toku_io_instrumentation io_annotation; + toku_instr_file_fd_close_begin( + io_annotation, toku_instr_file_op::file_close, fd, src_file, src_line); while (r != 0) { - r = close(fd); - if (r) { - int rr = errno; - if (rr!=EINTR) printf("rr=%d (%s)\n", rr, strerror(rr)); - assert(rr==EINTR); - } + r = close(fd); + if (r) { + int rr = errno; + if (rr != EINTR) + printf("rr=%d (%s)\n", rr, strerror(rr)); + assert(rr == EINTR); + } } - return r; -} - -int toku_os_rename(const char *old_name, const char *new_name) { - return rename(old_name, new_name); -} -int toku_os_unlink(const char *path) { return unlink(path); } + /* Regsiter the returning value with the system */ + toku_instr_file_close_end(io_annotation, r); -ssize_t -toku_os_read(int fd, void *buf, size_t count) { - ssize_t r; - if (t_read) - r = t_read(fd, buf, count); - else - r = read(fd, buf, count); return r; } -ssize_t -toku_os_pread (int fd, void *buf, size_t count, off_t offset) { - assert(0==((long long)buf)%512); - assert(0==count%512); - assert(0==offset%512); - ssize_t r; +ssize_t toku_os_read_with_source_location(int fd, + void *buf, + size_t count, + const char *src_file, + uint src_line) { + ssize_t bytes_read; + + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_read, + fd, + count, + src_file, + src_line); + + bytes_read = (t_read) ? t_read(fd, buf, count) : read(fd, buf, count); + + toku_instr_file_io_end(io_annotation, bytes_read); + + return bytes_read; +} + +ssize_t inline_toku_os_pread_with_source_location(int fd, + void *buf, + size_t count, + off_t offset, + const char *src_file, + uint src_line) { + assert(0 == ((long long)buf) % 512); + assert(0 == count % 512); + assert(0 == offset % 512); + ssize_t bytes_read; + + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_read, + fd, + count, + src_file, + src_line); if (t_pread) { - r = t_pread(fd, buf, count, offset); + bytes_read = t_pread(fd, buf, count, offset); } else { - r = pread(fd, buf, count, offset); + bytes_read = pread(fd, buf, count, offset); } - return r; + toku_instr_file_io_end(io_annotation, bytes_read); + + return bytes_read; } void toku_os_recursive_delete(const char *path) { @@ -411,13 +693,24 @@ void toku_set_func_fsync(int (*fsync_function)(int)) { } // keep trying if fsync fails because of EINTR -static void file_fsync_internal (int fd) { +void file_fsync_internal_with_source_location(int fd, + const char *src_file, + uint src_line) { uint64_t tstart = toku_current_time_microsec(); int r = -1; uint64_t eintr_count = 0; + + toku_io_instrumentation io_annotation; + toku_instr_file_io_begin(io_annotation, + toku_instr_file_op::file_sync, + fd, + 0, + src_file, + src_line); + while (r != 0) { - if (t_fsync) { - r = t_fsync(fd); + if (t_fsync) { + r = t_fsync(fd); } else { r = fsync(fd); } @@ -429,6 +722,9 @@ static void file_fsync_internal (int fd) { toku_sync_fetch_and_add(&toku_fsync_count, 1); uint64_t duration = toku_current_time_microsec() - tstart; toku_sync_fetch_and_add(&toku_fsync_time, duration); + + toku_instr_file_io_end(io_annotation, 0); + if (duration >= toku_long_fsync_threshold) { toku_sync_fetch_and_add(&toku_long_fsync_count, 1); toku_sync_fetch_and_add(&toku_long_fsync_time, duration); |