From 5e55d1ced52c52fb2f0508e1346059901a85960f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Lindstr=C3=B6m?= Date: Thu, 19 Dec 2013 14:36:38 +0200 Subject: Changes for Fusion-io multi-threaded flush, page compressed tables and tables using atomic write/table. This is work in progress and some parts are at most POC quality. --- storage/innobase/CMakeLists.txt | 3 + storage/innobase/buf/buf0buf.cc | 2 + storage/innobase/buf/buf0dblwr.cc | 31 +- storage/innobase/buf/buf0flu.cc | 324 +++++++- storage/innobase/buf/buf0mtflu.cc | 1103 +++++++++++++++++++++++++ storage/innobase/buf/buf0rea.cc | 5 +- storage/innobase/dict/dict0dict.cc | 7 +- storage/innobase/fil/fil0fil.cc | 175 +++- storage/innobase/fil/fil0pagecompress.cc | 369 +++++++++ storage/innobase/handler/ha_innodb.cc | 242 +++++- storage/innobase/handler/ha_innodb.h | 15 + storage/innobase/handler/handler0alter.cc | 28 + storage/innobase/include/buf0buf.h | 6 + storage/innobase/include/dict0dict.h | 14 +- storage/innobase/include/dict0dict.ic | 151 +++- storage/innobase/include/dict0mem.h | 56 +- storage/innobase/include/dict0pagecompress.h | 94 +++ storage/innobase/include/dict0pagecompress.ic | 191 +++++ storage/innobase/include/fil0fil.h | 43 +- storage/innobase/include/fil0pagecompress.h | 117 +++ storage/innobase/include/fsp0fsp.h | 66 +- storage/innobase/include/fsp0fsp.ic | 17 + storage/innobase/include/fsp0pagecompress.h | 64 ++ storage/innobase/include/fsp0pagecompress.ic | 61 ++ storage/innobase/include/fsp0types.h | 1 + storage/innobase/include/os0file.h | 57 +- storage/innobase/include/os0file.ic | 13 +- storage/innobase/include/srv0mon.h | 10 + storage/innobase/include/srv0srv.h | 64 +- storage/innobase/log/log0log.cc | 17 +- storage/innobase/log/log0recv.cc | 19 +- storage/innobase/os/os0file.cc | 561 +++++++++++-- storage/innobase/srv/srv0mon.cc | 68 ++ storage/innobase/srv/srv0srv.cc | 41 +- storage/innobase/srv/srv0start.cc | 720 +++++++++++++++- 35 files changed, 4559 insertions(+), 196 deletions(-) create mode 100644 storage/innobase/buf/buf0mtflu.cc create mode 100644 storage/innobase/fil/fil0pagecompress.cc create mode 100644 storage/innobase/include/dict0pagecompress.h create mode 100644 storage/innobase/include/dict0pagecompress.ic create mode 100644 storage/innobase/include/fil0pagecompress.h create mode 100644 storage/innobase/include/fsp0pagecompress.h create mode 100644 storage/innobase/include/fsp0pagecompress.ic diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt index ee8758a08d2..e41d2406bd2 100644 --- a/storage/innobase/CMakeLists.txt +++ b/storage/innobase/CMakeLists.txt @@ -278,6 +278,8 @@ SET(INNOBASE_SOURCES buf/buf0flu.cc buf/buf0lru.cc buf/buf0rea.cc +# TODO: JAN uncomment +# buf/buf0mtflu.cc data/data0data.cc data/data0type.cc dict/dict0boot.cc @@ -291,6 +293,7 @@ SET(INNOBASE_SOURCES eval/eval0eval.cc eval/eval0proc.cc fil/fil0fil.cc + fil/fil0pagecompress.cc fsp/fsp0fsp.cc fut/fut0fut.cc fut/fut0lst.cc diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 6efa14e6791..328d5a6f3bf 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -2,6 +2,7 @@ Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2008, Google Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -3254,6 +3255,7 @@ buf_page_init_low( bpage->access_time = 0; bpage->newest_modification = 0; bpage->oldest_modification = 0; + bpage->write_size = 0; HASH_INVALIDATE(bpage, hash); #if defined UNIV_DEBUG_FILE_ACCESSES || defined UNIV_DEBUG bpage->file_page_was_freed = FALSE; diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index fb853fe1543..933b56eaf88 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -365,8 +366,8 @@ buf_dblwr_init_or_restore_pages( /* Read the trx sys header to check if we are using the doublewrite buffer */ - fil_io(OS_FILE_READ, TRUE, TRX_SYS_SPACE, 0, TRX_SYS_PAGE_NO, 0, - UNIV_PAGE_SIZE, read_buf, NULL); + fil_io(OS_FILE_READ, true, TRX_SYS_SPACE, 0, TRX_SYS_PAGE_NO, 0, + UNIV_PAGE_SIZE, read_buf, NULL, 0); doublewrite = read_buf + TRX_SYS_DOUBLEWRITE; if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC) @@ -402,11 +403,11 @@ buf_dblwr_init_or_restore_pages( fil_io(OS_FILE_READ, TRUE, TRX_SYS_SPACE, 0, block1, 0, TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE, - buf, NULL); - fil_io(OS_FILE_READ, TRUE, TRX_SYS_SPACE, 0, block2, 0, + buf, NULL, 0); + fil_io(OS_FILE_READ, true, TRX_SYS_SPACE, 0, block2, 0, TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE, buf + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE, - NULL); + NULL, 0); /* Check if any of these pages is half-written in data files, in the intended position */ @@ -433,8 +434,8 @@ buf_dblwr_init_or_restore_pages( + i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE; } - fil_io(OS_FILE_WRITE, TRUE, 0, 0, source_page_no, 0, - UNIV_PAGE_SIZE, page, NULL); + fil_io(OS_FILE_WRITE, true, 0, 0, source_page_no, 0, + UNIV_PAGE_SIZE, page, NULL, 0); } else { space_id = mach_read_from_4( @@ -476,7 +477,7 @@ buf_dblwr_init_or_restore_pages( fil_io(OS_FILE_READ, TRUE, space_id, zip_size, page_no, 0, zip_size ? zip_size : UNIV_PAGE_SIZE, - read_buf, NULL); + read_buf, NULL, 0); /* Check if the page is corrupt */ @@ -528,7 +529,7 @@ buf_dblwr_init_or_restore_pages( fil_io(OS_FILE_WRITE, TRUE, space_id, zip_size, page_no, 0, zip_size ? zip_size : UNIV_PAGE_SIZE, - page, NULL); + page, NULL, 0); ib_logf(IB_LOG_LEVEL_INFO, "Recovered the page from" @@ -714,7 +715,7 @@ buf_dblwr_write_block_to_datafile( buf_page_get_page_no(bpage), 0, buf_page_get_zip_size(bpage), (void*) bpage->zip.data, - (void*) bpage); + (void*) bpage, 0); return; } @@ -727,7 +728,7 @@ buf_dblwr_write_block_to_datafile( fil_io(OS_FILE_WRITE | OS_AIO_SIMULATED_WAKE_LATER, FALSE, buf_block_get_space(block), 0, buf_block_get_page_no(block), 0, UNIV_PAGE_SIZE, - (void*) block->frame, (void*) block); + (void*) block->frame, (void*) block, 0); } /********************************************************************//** @@ -820,7 +821,7 @@ try_again: fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0, buf_dblwr->block1, 0, len, - (void*) write_buf, NULL); + (void*) write_buf, NULL, 0); if (buf_dblwr->first_free <= TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) { /* No unwritten pages in the second block. */ @@ -836,7 +837,7 @@ try_again: fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0, buf_dblwr->block2, 0, len, - (void*) write_buf, NULL); + (void*) write_buf, NULL, 0); flush: /* increment the doublewrite flushed pages counter */ @@ -1056,14 +1057,14 @@ retry: fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0, offset, 0, UNIV_PAGE_SIZE, (void*) (buf_dblwr->write_buf - + UNIV_PAGE_SIZE * i), NULL); + + UNIV_PAGE_SIZE * i), NULL, 0); } else { /* It is a regular page. Write it directly to the doublewrite buffer */ fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0, offset, 0, UNIV_PAGE_SIZE, (void*) ((buf_block_t*) bpage)->frame, - NULL); + NULL, 0); } /* Now flush the doublewrite buffer data to disk */ diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 542c1669667..06ae7b5375c 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -1,6 +1,8 @@ /***************************************************************************** Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. +Copyright (c) 2013, Fusion-io. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -673,8 +675,10 @@ buf_flush_write_complete( flush_type = buf_page_get_flush_type(bpage); buf_pool->n_flush[flush_type]--; +#ifdef UNIV_DEBUG /* fprintf(stderr, "n pending flush %lu\n", buf_pool->n_flush[flush_type]); */ +#endif if (buf_pool->n_flush[flush_type] == 0 && buf_pool->init_flush[flush_type] == FALSE) { @@ -938,7 +942,7 @@ buf_flush_write_block_low( FALSE, buf_page_get_space(bpage), zip_size, buf_page_get_page_no(bpage), 0, zip_size ? zip_size : UNIV_PAGE_SIZE, - frame, bpage); + frame, bpage, 0); } else if (flush_type == BUF_FLUSH_SINGLE_PAGE) { buf_dblwr_write_single_page(bpage); } else { @@ -1213,7 +1217,9 @@ buf_flush_try_neighbors( } } +#ifdef UNIV_DEBUG /* fprintf(stderr, "Flush area: low %lu high %lu\n", low, high); */ +#endif if (high > fil_space_get_size(space)) { high = fil_space_get_size(space); @@ -1655,7 +1661,7 @@ pages: to avoid deadlocks, this function must be written so that it cannot end up waiting for these latches! NOTE 2: in the case of a flush list flush, the calling thread is not allowed to own any latches on pages! @return number of blocks for which the write request was queued */ -static +//static ulint buf_flush_batch( /*============*/ @@ -1712,7 +1718,7 @@ buf_flush_batch( /******************************************************************//** Gather the aggregated stats for both flush list and LRU list flushing */ -static +//static void buf_flush_common( /*=============*/ @@ -1737,7 +1743,7 @@ buf_flush_common( /******************************************************************//** Start a buffer flush batch for LRU or flush list */ -static +//static ibool buf_flush_start( /*============*/ @@ -1766,7 +1772,7 @@ buf_flush_start( /******************************************************************//** End a buffer flush batch for LRU or flush list */ -static +//static void buf_flush_end( /*==========*/ @@ -1816,11 +1822,55 @@ buf_flush_wait_batch_end( } } else { thd_wait_begin(NULL, THD_WAIT_DISKIO); - os_event_wait(buf_pool->no_flush[type]); + os_event_wait(buf_pool->no_flush[type]); thd_wait_end(NULL); } } +/* JAN: TODO: */ +/*******************************************************************//** +This utility flushes dirty blocks from the end of the LRU list and also +puts replaceable clean pages from the end of the LRU list to the free +list. +NOTE: The calling thread is not allowed to own any latches on pages! +@return true if a batch was queued successfully. false if another batch +of same type was already running. */ +static +bool +pgcomp_buf_flush_LRU( +/*==========*/ + buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */ + ulint min_n, /*!< in: wished minimum mumber of blocks + flushed (it is not guaranteed that the + actual number is that big, though) */ + ulint* n_processed) /*!< out: the number of pages + which were processed is passed + back to caller. Ignored if NULL */ +{ + ulint page_count; + + if (n_processed) { + *n_processed = 0; + } + + if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) { + return(false); + } + + page_count = buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0); + + buf_flush_end(buf_pool, BUF_FLUSH_LRU); + + buf_flush_common(BUF_FLUSH_LRU, page_count); + + if (n_processed) { + *n_processed = page_count; + } + + return(true); +} +/* JAN: TODO: END: */ + /*******************************************************************//** This utility flushes dirty blocks from the end of the LRU list and also puts replaceable clean pages from the end of the LRU list to the free @@ -1863,6 +1913,168 @@ buf_flush_LRU( return(true); } +/* JAN: TODO: */ +/*******************************************************************//**/ +extern int is_pgcomp_wrk_init_done(void); +extern int pgcomp_flush_work_items(int buf_pool_inst, int *pages_flushed, + int flush_type, int min_n, unsigned long long lsn_limit); + +#define MT_COMP_WATER_MARK 50 + +#include +int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time) +{ + if (g_time->tv_usec < s_time->tv_usec) + { + int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000 + 1; + s_time->tv_usec -= 1000000 * nsec; + s_time->tv_sec += nsec; + } + if (g_time->tv_usec - s_time->tv_usec > 1000000) + { + int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000; + s_time->tv_usec += 1000000 * nsec; + s_time->tv_sec -= nsec; + } + d_time->tv_sec = g_time->tv_sec - s_time->tv_sec; + d_time->tv_usec = g_time->tv_usec - s_time->tv_usec; + + return 0; +} + +static pthread_mutex_t pgcomp_mtx = PTHREAD_MUTEX_INITIALIZER; +/*******************************************************************//** +Multi-threaded version of buf_flush_list +*/ +UNIV_INTERN +bool +pgcomp_buf_flush_list( +/*==================*/ + ulint min_n, /*!< in: wished minimum mumber of blocks + flushed (it is not guaranteed that the + actual number is that big, though) */ + lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all + blocks whose oldest_modification is + smaller than this should be flushed + (if their number does not exceed + min_n), otherwise ignored */ + ulint* n_processed) /*!< out: the number of pages + which were processed is passed + back to caller. Ignored if NULL */ + +{ + ulint i; + bool success = true; + struct timeval p_start_time, p_end_time, d_time; + + if (n_processed) { + *n_processed = 0; + } + + if (min_n != ULINT_MAX) { + /* Ensure that flushing is spread evenly amongst the + buffer pool instances. When min_n is ULINT_MAX + we need to flush everything up to the lsn limit + so no limit here. */ + min_n = (min_n + srv_buf_pool_instances - 1) + / srv_buf_pool_instances; + } + +#ifdef UNIV_DEBUG + gettimeofday(&p_start_time, 0x0); +#endif + if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) { + int cnt_flush[32]; + + //stack_trace(); + pthread_mutex_lock(&pgcomp_mtx); + //gettimeofday(&p_start_time, 0x0); + //fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n); + pgcomp_flush_work_items(srv_buf_pool_instances, + cnt_flush, BUF_FLUSH_LIST, + min_n, lsn_limit); + + for (i = 0; i < srv_buf_pool_instances; i++) { + if (n_processed) { + *n_processed += cnt_flush[i]; + } + if (cnt_flush[i]) { + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_FLUSH_BATCH_TOTAL_PAGE, + MONITOR_FLUSH_BATCH_COUNT, + MONITOR_FLUSH_BATCH_PAGES, + cnt_flush[i]); + + } + } + + pthread_mutex_unlock(&pgcomp_mtx); + +#ifdef UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( + min_n * srv_buf_pool_instances), *n_processed, + (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); +#endif + return(success); + } + /* Flush to lsn_limit in all buffer pool instances */ + for (i = 0; i < srv_buf_pool_instances; i++) { + buf_pool_t* buf_pool; + ulint page_count = 0; + + buf_pool = buf_pool_from_array(i); + + if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) { + /* We have two choices here. If lsn_limit was + specified then skipping an instance of buffer + pool means we cannot guarantee that all pages + up to lsn_limit has been flushed. We can + return right now with failure or we can try + to flush remaining buffer pools up to the + lsn_limit. We attempt to flush other buffer + pools based on the assumption that it will + help in the retry which will follow the + failure. */ + success = false; + + continue; + } + + page_count = buf_flush_batch( + buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit); + + buf_flush_end(buf_pool, BUF_FLUSH_LIST); + + buf_flush_common(BUF_FLUSH_LIST, page_count); + + if (n_processed) { + *n_processed += page_count; + } + + if (page_count) { + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_FLUSH_BATCH_TOTAL_PAGE, + MONITOR_FLUSH_BATCH_COUNT, + MONITOR_FLUSH_BATCH_PAGES, + page_count); + } + } + +#if UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", ( + min_n * srv_buf_pool_instances), *n_processed, + (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); +#endif + return(success); +} +#endif +/* JAN: TODO: END: */ + /*******************************************************************//** This utility flushes dirty blocks from the end of the flush list of all buffer pool instances. @@ -1890,6 +2102,12 @@ buf_flush_list( ulint i; bool success = true; + /* JAN: TODO: */ + if (is_pgcomp_wrk_init_done()) { + return(pgcomp_buf_flush_list(min_n, lsn_limit, n_processed)); + } + /* JAN: TODO: END: */ + if (n_processed) { *n_processed = 0; } @@ -2043,6 +2261,59 @@ buf_flush_single_page_from_LRU( return(freed); } +/* JAN: TODO: */ +/*********************************************************************//** +pgcomp_Clears up tail of the LRU lists: +* Put replaceable pages at the tail of LRU to the free list +* Flush dirty pages at the tail of LRU to the disk +The depth to which we scan each buffer pool is controlled by dynamic +config parameter innodb_LRU_scan_depth. +@return total pages flushed */ +UNIV_INTERN +ulint +pgcomp_buf_flush_LRU_tail(void) +/*====================*/ +{ + struct timeval p_start_time, p_end_time, d_time; + ulint total_flushed=0, i=0; + int cnt_flush[32]; + +#if UNIV_DEBUG + gettimeofday(&p_start_time, 0x0); +#endif + assert(is_pgcomp_wrk_init_done()); + + pthread_mutex_lock(&pgcomp_mtx); + pgcomp_flush_work_items(srv_buf_pool_instances, + cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); + + for (i = 0; i < srv_buf_pool_instances; i++) { + if (cnt_flush[i]) { + total_flushed += cnt_flush[i]; + + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_LRU_BATCH_TOTAL_PAGE, + MONITOR_LRU_BATCH_COUNT, + MONITOR_LRU_BATCH_PAGES, + cnt_flush[i]); + } + } + + pthread_mutex_unlock(&pgcomp_mtx); + +#if UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( + srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed, + (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); +#endif + + return(total_flushed); +} +/* JAN: TODO: END: */ + /*********************************************************************//** Clears up tail of the LRU lists: * Put replaceable pages at the tail of LRU to the free list @@ -2056,6 +2327,12 @@ buf_flush_LRU_tail(void) /*====================*/ { ulint total_flushed = 0; + /* JAN: TODO: */ + if(is_pgcomp_wrk_init_done()) + { + return(pgcomp_buf_flush_LRU_tail()); + } + /* JAN: TODO: END */ for (ulint i = 0; i < srv_buf_pool_instances; i++) { @@ -2342,6 +2619,8 @@ page_cleaner_sleep_if_needed( } } + + /******************************************************************//** page_cleaner thread tasked with flushing dirty pages from the buffer pools. As of now we'll have only one instance of this thread. @@ -2357,6 +2636,7 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)( ulint next_loop_time = ut_time_ms() + 1000; ulint n_flushed = 0; ulint last_activity = srv_get_activity_count(); + ulint n_lru=0, n_pgc_flush=0, n_pgc_batch=0; ut_ad(!srv_read_only_mode); @@ -2368,7 +2648,6 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)( fprintf(stderr, "InnoDB: page_cleaner thread running, id %lu\n", os_thread_pf(os_thread_get_curr_id())); #endif /* UNIV_DEBUG_THREAD_CREATION */ - buf_page_cleaner_is_active = TRUE; while (srv_shutdown_state == SRV_SHUTDOWN_NONE) { @@ -2388,12 +2667,23 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)( last_activity = srv_get_activity_count(); /* Flush pages from end of LRU if required */ - n_flushed = buf_flush_LRU_tail(); + n_lru = n_flushed = buf_flush_LRU_tail(); +#ifdef UNIV_DEBUG + if (n_lru) { + fprintf(stderr,"n_lru:%lu ",n_lru); + } +#endif /* Flush pages from flush_list if required */ - n_flushed += page_cleaner_flush_pages_if_needed(); + n_flushed += n_pgc_flush = page_cleaner_flush_pages_if_needed(); + +#ifdef UNIV_DEBUG + if (n_pgc_flush) { + fprintf(stderr,"n_pgc_flush:%lu ",n_pgc_flush); + } +#endif } else { - n_flushed = page_cleaner_do_flush_batch( + n_pgc_batch = n_flushed = page_cleaner_do_flush_batch( PCT_IO(100), LSN_MAX); @@ -2404,7 +2694,18 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)( MONITOR_FLUSH_BACKGROUND_PAGES, n_flushed); } +#ifdef UNIV_DEBUG + if (n_pgc_batch) { + fprintf(stderr,"n_pgc_batch:%lu ",n_pgc_batch); + } +#endif } +#ifdef UNIV_DEBUG + if (n_lru || n_pgc_flush || n_pgc_batch) { + fprintf(stderr,"\n"); + n_lru = n_pgc_flush = n_pgc_batch = 0; + } +#endif } ut_ad(srv_shutdown_state > 0); @@ -2573,8 +2874,9 @@ buf_flush_validate( return(ret); } + #endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */ -#endif /* !UNIV_HOTBACKUP */ + #ifdef UNIV_DEBUG /******************************************************************//** diff --git a/storage/innobase/buf/buf0mtflu.cc b/storage/innobase/buf/buf0mtflu.cc new file mode 100644 index 00000000000..7abe0547877 --- /dev/null +++ b/storage/innobase/buf/buf0mtflu.cc @@ -0,0 +1,1103 @@ +/***************************************************************************** + +Copyright (C) 2013 Fusion-io. All Rights Reserved. +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file buf/buf0mtflu.cc +Multi-threaded flush method implementation + +Created 06/11/2013 Dhananjoy Das DDas@fusionio.com +Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +#include + +#ifdef UNIV_PFS_MUTEX +/* Key to register fil_system_mutex with performance schema */ +UNIV_INTERN mysql_pfs_key_t mtflush_mutex_key; +#endif /* UNIV_PFS_MUTEX */ + +/* Mutex to protect critical sections during multi-threaded flush */ +ib_mutex_t mt_flush_mutex; + +#define MT_COMP_WATER_MARK 50 + +/* Work item status */ +typedef enum { + WORK_ITEM_SET=0, /* Work item information set */ + WORK_ITEM_START=1, /* Work item assigned to thread and + execution started */ + WORK_ITEM_DONE=2, /* Work item execution done */ +} mtflu_witem_status_t; + +/* Work thread status */ +typedef enum { + WORK_THREAD_NOT_INIT=0, /* Work thread not initialized */ + WORK_THREAD_INITIALIZED=1, /* Work thread initialized */ + WORK_THREAD_SIG_WAITING=2, /* Work thred signaled */ + WORK_THREAD_RUNNING=3, /* Work thread running */ + WORK_THREAD_NO_WORK=4, /* Work thread has no work to do */ +} mtflu_wthr_status_t; + +/* Structure containing multi-treaded flush thread information */ +typedef struct { + os_thread_t wthread_id; /* Thread id */ + opq_t *wq; /* Write queue ? */ + opq_t *cq; /* Commit queue ?*/ + ib_mutex_t thread_mutex; /* Mutex proecting below + structures */ + mtflu_wthr_status_t thread_status; /* Thread status */ + ib_uint64_t total_num_processed; /* Total number of + pages processed */ + ib_uint64_t cycle_num_processed; /* Numper of pages + processed on last + cycle */ + ulint check_wrk_done_count; /* Number of pages + to process in this + work item ? */ + ulint done_cnt_flag; /* Number of pages + processed in this + work item ?*/ +} mtflu_thread_t; + +struct work_item_t { + /****************************/ + /* Need to group into struct*/ + buf_pool_t* buf_pool; //buffer-pool instance + int flush_type; //flush-type for buffer-pool flush operation + ulint min; //minimum number of pages requested to be flushed + lsn_t lsn_limit; //lsn limit for the buffer-pool flush operation + /****************************/ + + unsigned long result; //flush pages count + unsigned long t_usec; //time-taken in usec + os_thread_t id_usr; /* thread-id + currently working , why ? */ + mtflu_witem_status_t wi_status; /* work item status */ + + UT_LIST_NODE_T(work_node_t) next; +}; + +/* Multi-threaded flush system structure */ +typedef struct { + int pgc_n_threads = 8;// ??? why what this is + + mtflu_thread_t pc_sync[PGCOMP_MAX_WORKER]; + wrk_t work_items[PGCOMP_MAX_WORKER]; + int pgcomp_wrk_initialized = -1; /* ???? */ + opq_t wq; /* write queue ? */ + opq_t cq; /* commit queue ? */ +} mtflu_system_t; + +typedef enum op_q_status { + Q_NOT_INIT=0, + Q_EMPTY=1, + Q_INITIALIZED=2, + Q_PROCESS=3, + Q_DONE=4, + Q_ERROR=5, + Q_STATUS_UNDEFINED +} q_status_t; + +// NOTE: jan: could we use ut/ut0wqueue.(h|cc) +// NOTE: jan: here ????, it would handle waiting, signaling +// and contains simple interface + +typedef struct op_queue +{ + ib_mutex_t mtx; /* Mutex protecting below variables + */ + os_cond_t cv; /* ? is waiting here ? */ + q_status_t flag; /* Operation queue status */ + UT_LIST_BASE_NODE_T(work_item_t) work_list; +} opq_t; + + +/*******************************************************************//** +Initialize multi-threaded flush. +*/ +void +buf_mtflu_init(void) +/*================*/ +{ + mutex_create(mtflush_mutex_key, + &mt_flush_mutex, SYNC_ANY_LATCH); +} + +/*******************************************************************//** +This utility flushes dirty blocks from the end of the LRU list and also +puts replaceable clean pages from the end of the LRU list to the free +list. +NOTE: The calling thread is not allowed to own any latches on pages! +@return true if a batch was queued successfully. false if another batch +of same type was already running. */ +bool +buf_mtflu_flush_LRU( +/*================*/ + buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */ + ulint min_n, /*!< in: wished minimum mumber of blocks + flushed (it is not guaranteed that the + actual number is that big, though) */ + ulint* n_processed) /*!< out: the number of pages + which were processed is passed + back to caller. Ignored if NULL */ +{ + ulint page_count; + + if (n_processed) { + *n_processed = 0; + } + + if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) { + return(false); + } + + page_count = buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0); + + buf_flush_end(buf_pool, BUF_FLUSH_LRU); + + buf_flush_common(BUF_FLUSH_LRU, page_count); + + if (n_processed) { + *n_processed = page_count; + } + + return(true); +} + +#ifdef UNIV_DEBUG +/*******************************************************************//** +Utility function to calculate time difference between start time +and end time. +@return Time difference. +*/ +UNIV_INTERN +void +mtflu_timediff( +/*===========*/ + struct timeval *g_time, /*!< in/out: Start time*/ + struct timeval *s_time, /*!< in/out: End time */ + struct timeval *d_time) /*!< out: Time difference */ +{ + if (g_time->tv_usec < s_time->tv_usec) + { + int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000 + 1; + s_time->tv_usec -= 1000000 * nsec; + s_time->tv_sec += nsec; + } + if (g_time->tv_usec - s_time->tv_usec > 1000000) + { + int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000; + s_time->tv_usec += 1000000 * nsec; + s_time->tv_sec -= nsec; + } + d_time->tv_sec = g_time->tv_sec - s_time->tv_sec; + d_time->tv_usec = g_time->tv_usec - s_time->tv_usec; +} +#endif + +/*******************************************************************//** +This utility flushes dirty blocks from the end of the flush list of +all buffer pool instances. This is multi-threaded version of buf_flush_list. +NOTE: The calling thread is not allowed to own any latches on pages! +@return true if a batch was queued successfully for each buffer pool +instance. false if another batch of same type was already running in +at least one of the buffer pool instance */ +bool +buf_mtflu_flush_list( +/*=================*/ + ulint min_n, /*!< in: wished minimum mumber of blocks + flushed (it is not guaranteed that the + actual number is that big, though) */ + lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all + blocks whose oldest_modification is + smaller than this should be flushed + (if their number does not exceed + min_n), otherwise ignored */ + ulint* n_processed) /*!< out: the number of pages + which were processed is passed + back to caller. Ignored if NULL */ + +{ + ulint i; + bool success = true; + struct timeval p_start_time, p_end_time, d_time; + + if (n_processed) { + *n_processed = 0; + } + + if (min_n != ULINT_MAX) { + /* Ensure that flushing is spread evenly amongst the + buffer pool instances. When min_n is ULINT_MAX + we need to flush everything up to the lsn limit + so no limit here. */ + min_n = (min_n + srv_buf_pool_instances - 1) + / srv_buf_pool_instances; + } + +#ifdef UNIV_DEBUG + gettimeofday(&p_start_time, 0x0); +#endif + if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) { + int cnt_flush[32]; + + mutex_enter(&mt_flush_mutex); + +#ifdef UNIV_DEBUG + fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n); +#endif + pgcomp_flush_work_items(srv_buf_pool_instances, + cnt_flush, BUF_FLUSH_LIST, + min_n, lsn_limit); + + for (i = 0; i < srv_buf_pool_instances; i++) { + if (n_processed) { + *n_processed += cnt_flush[i]; + } + if (cnt_flush[i]) { + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_FLUSH_BATCH_TOTAL_PAGE, + MONITOR_FLUSH_BATCH_COUNT, + MONITOR_FLUSH_BATCH_PAGES, + cnt_flush[i]); + + } + } + + mutex_exit(&pgcomp_mtx); + +#ifdef UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( + min_n * srv_buf_pool_instances), *n_processed, + (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); +#endif + return(success); + } + + /* Flush to lsn_limit in all buffer pool instances */ + for (i = 0; i < srv_buf_pool_instances; i++) { + buf_pool_t* buf_pool; + ulint page_count = 0; + + buf_pool = buf_pool_from_array(i); + + if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) { + /* We have two choices here. If lsn_limit was + specified then skipping an instance of buffer + pool means we cannot guarantee that all pages + up to lsn_limit has been flushed. We can + return right now with failure or we can try + to flush remaining buffer pools up to the + lsn_limit. We attempt to flush other buffer + pools based on the assumption that it will + help in the retry which will follow the + failure. */ + success = false; + + continue; + } + + page_count = buf_flush_batch( + buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit); + + buf_flush_end(buf_pool, BUF_FLUSH_LIST); + + buf_flush_common(BUF_FLUSH_LIST, page_count); + + if (n_processed) { + *n_processed += page_count; + } + + if (page_count) { + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_FLUSH_BATCH_TOTAL_PAGE, + MONITOR_FLUSH_BATCH_COUNT, + MONITOR_FLUSH_BATCH_PAGES, + page_count); + } + } + +#ifdef UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", ( + min_n * srv_buf_pool_instances), *n_processed, + (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); +#endif + return(success); +} + +/*********************************************************************//** +Clear up tail of the LRU lists: +* Put replaceable pages at the tail of LRU to the free list +* Flush dirty pages at the tail of LRU to the disk +The depth to which we scan each buffer pool is controlled by dynamic +config parameter innodb_LRU_scan_depth. +@return total pages flushed */ +ulint +buf_mtflu_flush_LRU_tail(void) +/*==========================*/ +{ + ulint total_flushed=0, i=0; + int cnt_flush[32]; + +#ifdef UNIV_DEBUG + struct timeval p_start_time, p_end_time, d_time; + gettimeofday(&p_start_time, 0x0); +#endif + assert(is_pgcomp_wrk_init_done()); + + mutex_enter(&pgcomp_mtx); + pgcomp_flush_work_items(srv_buf_pool_instances, + cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0); + + for (i = 0; i < srv_buf_pool_instances; i++) { + if (cnt_flush[i]) { + total_flushed += cnt_flush[i]; + + MONITOR_INC_VALUE_CUMULATIVE( + MONITOR_LRU_BATCH_TOTAL_PAGE, + MONITOR_LRU_BATCH_COUNT, + MONITOR_LRU_BATCH_PAGES, + cnt_flush[i]); + } + } + + mutex_exit(&pgcomp_mtx); + +#if UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", ( + srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed, + (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000))); +#endif + + return(total_flushed); +} + +/*******************************************************************//** +Set work done count to given count. +@return 1 if still work to do, 0 if no work left */ +int +set_check_done_flag_count(int cnt) +/*================*/ +{ + return(check_wrk_done_count = cnt); +} + +/*******************************************************************//** +? +@return why ? */ +int +set_pgcomp_wrk_init_done(void) +/*================*/ +{ + pgcomp_wrk_initialized = 1; + return 0; +} + +/*******************************************************************//** +? +@return true if work is initialized */ +bool +is_pgcomp_wrk_init_done(void) +/*================*/ +{ + return(pgcomp_wrk_initialized == 1); +} + +/*******************************************************************//** +Set current done pages count to the given value +@return number of pages flushed */ +int +set_done_cnt_flag(int val) +/*================*/ +{ + /* + * Assumption: The thread calling into set_done_cnt_flag + * needs to have "cq.mtx" acquired, else not safe. + */ + done_cnt_flag = val; + return done_cnt_flag; +} + +/*******************************************************************//** +? +@return number of pages flushed */ +int +cv_done_inc_flag_sig(thread_sync_t * ppc) +/*================*/ +{ + mutex_enter(&ppc->cq->mtx); + ppc->stat_universal_num_processed++; + ppc->stat_cycle_num_processed++; + done_cnt_flag++; + if(!(done_cnt_flag <= check_wrk_done_count)) { + fprintf(stderr, "ERROR: done_cnt:%d check_wrk_done_count:%d\n", + done_cnt_flag, check_wrk_done_count); + } + assert(done_cnt_flag <= check_wrk_done_count); + mutex_exit(&ppc->cq->mtx); + if(done_cnt_flag == check_wrk_done_count) { + // why below does not need mutex protection ? + ppc->wq->flag = Q_DONE; + mutex_enter(&ppc->cq->mtx); + ppc->cq->flag = Q_DONE; + os_cond_signal(&ppc->cq->cv); + mutex_exit(&ppc->cq->mtx); + } + return(done_cnt_flag); +} + +/*******************************************************************//** +Remove work item from queue, in my opinion not needed after we use +UT_LIST +@return number of pages flushed */ +int +q_remove_wrk(opq_t *q, wrk_t **wi) +/*================*/ +{ + int ret = 0; + + if(!wi || !q) { + return -1; + } + + mutex_enter(&q->mtx); + assert(!((q->tail == NULL) && (q->head != NULL))); + assert(!((q->tail != NULL) && (q->head == NULL))); + + /* get the first in the list*/ + *wi = q->head; + if(q->head) { + ret = 0; + q->head = q->head->next; + (*wi)->next = NULL; + if(!q->head) { + q->tail = NULL; + } + } else { + q->tail = NULL; + ret = 1; /* indicating remove from queue failed */ + } + mutex_exit(&q->mtx); + return (ret); +} + +/*******************************************************************//** +Return true if work item has being assigned to a thread or false +if work item is not assigned. +@return true if work is assigned, false if not */ +bool +is_busy_wrk_itm(wrk_t *wi) +/*================*/ +{ + if(!wi) { + return -1; + } + return(!(wi->id_usr == -1)); +} + +/*******************************************************************//** +Initialize work items. +@return why ? */ +int +setup_wrk_itm(int items) +/*================*/ +{ + int i; + for(i=0; imtx = os_mutex_create(); + os_cond_init(&q->cv); + q->flag = Q_INITIALIZED; + q->head = q->tail = NULL; + + return 0; +} + +/// NEEDED ? +#if 0 +int drain_cq(opq_t *cq, int items) +{ + int i=0; + + if(!cq) { + return -1; + } + mutex_enter(&cq->mtx); + for(i=0; ihead = cq->tail = NULL; + mutex_unlock(&cq->mtx); + return 0; +} +#endif + +/*******************************************************************//** +Insert work item list to queue, not needed with UT_LIST +@return why ? */ +int +q_insert_wrk_list(opq_t *q, wrk_t *w_list) +/*================*/ +{ + if((!q) || (!w_list)) { + fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list); + return -1; + } + + mutex_enter(&q->mtx); + + assert(!((q->tail == NULL) && (q->head != NULL))); + assert(!((q->tail != NULL) && (q->head == NULL))); + + /* list is empty */ + if(!q->tail) { + q->head = q->tail = w_list; + } else { + /* added the first of the node to list */ + assert(q->head != NULL); + q->tail->next = w_list; + } + + /* move tail to the last node */ + while(q->tail->next) { + q->tail = q->tail->next; + } + mutex_exit(&q->mtx); + + return 0; +} + +/*******************************************************************//** +Flush ? +@return why ? */ +int +flush_pool_instance(wrk_t *wi) +/*================*/ +{ + struct timeval p_start_time, p_end_time, d_time; + + if(!wi) { + fprintf(stderr, "work item invalid wi:%p\n", wi); + return -1; + } + + wi->t_usec = 0; + if (!buf_flush_start(wi->buf_pool, (buf_flush_t)wi->flush_type)) { + /* We have two choices here. If lsn_limit was + specified then skipping an instance of buffer + pool means we cannot guarantee that all pages + up to lsn_limit has been flushed. We can + return right now with failure or we can try + to flush remaining buffer pools up to the + lsn_limit. We attempt to flush other buffer + pools based on the assumption that it will + help in the retry which will follow the + failure. */ + fprintf(stderr, "flush_start Failed, flush_type:%d\n", + (buf_flush_t)wi->flush_type); + return -1; + } + +#ifdef UNIV_DEBUG + /* Record time taken for the OP in usec */ + gettimeofday(&p_start_time, 0x0); +#endif + + if((buf_flush_t)wi->flush_type == BUF_FLUSH_LRU) { + /* srv_LRU_scan_depth can be arbitrarily large value. + * We cap it with current LRU size. + */ + buf_pool_mutex_enter(wi->buf_pool); + wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU); + buf_pool_mutex_exit(wi->buf_pool); + wi->min = ut_min(srv_LRU_scan_depth,wi->min); + } + + wi->result = buf_flush_batch(wi->buf_pool, + (buf_flush_t)wi->flush_type, + wi->min, wi->lsn_limit); + + buf_flush_end(wi->buf_pool, (buf_flush_t)wi->flush_type); + buf_flush_common((buf_flush_t)wi->flush_type, wi->result); + +#ifdef UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000)); +#endif + return 0; +} + +/*******************************************************************//** +? +@return why ? */ +int +service_page_comp_io(thread_sync_t * ppc) +/*================*/ +{ + wrk_t *wi = NULL; + int ret=0; + struct timespec ts; + + mutex_enter(&ppc->wq->mtx); + do{ + ppc->wt_status = WTHR_SIG_WAITING; + ret = os_cond_wait(&ppc->wq->cv, &ppc->wq->mtx); + ppc->wt_status = WTHR_RUNNING; + if(ret == ETIMEDOUT) { + fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%d] ret:%d\n", + done_cnt_flag, ret); + } else if(ret == EINVAL || ret == EPERM) { + fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%d] ret:%d\n", + done_cnt_flag, ret); + } + if(ppc->wq->flag == Q_PROCESS) { + break; + } else { + mutex_exit(&ppc->wq->mtx); + return -1; + } + } while (ppc->wq->flag == Q_PROCESS && ret == 0); + + mutex_exit(&ppc->wq->mtx); + + while (ppc->cq->flag == Q_PROCESS) { + wi = NULL; + /* Get the work item */ + if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) { + ppc->wt_status = WTHR_NO_WORK; + return -1; + } + + assert(ret==0); + assert(wi != NULL); + assert(0 == is_busy_wrk_itm(wi)); + assert(wi->id_usr == -1); + + wi->id_usr = ppc->wthread; + wi->wi_status = WRK_ITEM_START; + + /* Process work item */ + if(0 != (ret = flush_pool_instance(wi))) { + fprintf(stderr, "FLUSH op failed ret:%d\n", ret); + wi->wi_status = WRK_ITEM_FAILED; + } + ret = q_insert_wrk_list(ppc->cq, wi); + + assert(0==ret); + assert(check_wrk_done_count >= done_cnt_flag); + wi->wi_status = WRK_ITEM_SUCCESS; + if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) { + break; + } + } + return(0); +} + +/******************************************************************//** +Thread main function for multi-threaded flush +@return a dummy parameter*/ +extern "C" UNIV_INTERN +os_thread_ret_t +DECLARE_THREAD(page_comp_io_thread)( +/*==========================================*/ + void * arg) +{ + thread_sync_t *ppc_io = ((thread_sync_t *)arg); + + while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { + service_page_comp_io(ppc_io); + ppc_io->stat_cycle_num_processed = 0; + } + os_thread_exit(NULL); + OS_THREAD_DUMMY_RETURN; +} + +/*******************************************************************//** +Print queue work item +@return why ? */ +int +print_queue_wrk_itm(opq_t *q) +/*================*/ +{ +#if UNIV_DEBUG + wrk_t *wi = NULL; + + if(!q) { + fprintf(stderr, "queue NULL\n"); + return -1; + } + + if(!q->head || !q->tail) { + assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL)))); + fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail); + return 0; + } + + mutex_enter(&q->mtx); + for(wi = q->head; (wi != NULL) ; wi = wi->next) { + //fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n", + // wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next); + fprintf(stderr, "- [%p] [%s] >%p\n", + wi, (wi->id_usr == -1)?"free":"Busy", wi->next); + } + mutex_exit(&q->mtx); +#endif + return(0); +} + +/*******************************************************************//** +Print work list +@return why ? */ +int +print_wrk_list(wrk_t *wi_list) +/*================*/ +{ + wrk_t *wi = wi_list; + int i=0; + + if(!wi_list) { + fprintf(stderr, "list NULL\n"); + } + + while(wi) { + fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n", + wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next); + wi = wi->next; + i++; + } + fprintf(stderr, "list len: %d\n", i); + return 0; +} + +/*******************************************************************//** +? +@return why ? */ +int +pgcomp_handler(wrk_t *w_list) +/*================*/ +{ + struct timespec ts; + int ret=0, t_flag=0; + opq_t *wrk_q=NULL, *comp_q=NULL; + wrk_t *tw_list=NULL; + + wrk_q=&wq; + comp_q=&cq; + + mutex_enter(&wrk_q->mtx); + /* setup work queue here.. */ + wrk_q->flag = Q_EMPTY; + mutex_exit(&wrk_q->mtx); + + ret = q_insert_wrk_list(wrk_q, w_list); + if(ret != 0) { + fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n", + __FUNCTION__, &wq, w_list); + return -1; + } + +retry_submit: + mutex_enter(&wrk_q->mtx); + /* setup work queue here.. */ + wrk_q->flag = Q_INITIALIZED; + mutex_exit(&wrk_q->mtx); + + + mutex_enter(&comp_q->mtx); + if(0 != set_done_cnt_flag(0)) { + fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__); + mutex_exit(&comp_q->mtx); + return -1; + } + comp_q->flag = Q_PROCESS; + mutex_enter(&comp_q->mtx); + + /* if threads are waiting request them to start */ + mutex_enter(&wrk_q->mtx); + wrk_q->flag = Q_PROCESS; + os_cond_broadcast(&wrk_q->cv); + mutex_exit(&wrk_q->mtx); + + /* Wait on all worker-threads to complete */ + mutex_enter(&comp_q->mtx); + if (comp_q->flag != Q_DONE) { + do { + os_cond_wait(&comp_q->cv, &comp_q->mtx); + if(comp_q->flag != Q_DONE) { + fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%d\n", + comp_q->flag, done_cnt_flag); + if (done_cnt_flag != srv_buf_pool_instances) { + fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%d\n", + comp_q->flag, done_cnt_flag); + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + } + continue; + } else if (done_cnt_flag != srv_buf_pool_instances) { + fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%d\n", + comp_q->flag, done_cnt_flag); + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + comp_q->flag = Q_INITIALIZED; + mutex_exit(&comp_q->mtx); + goto retry_submit; + + ut_ad(!done_cnt_flag); + continue; + } + ut_ad(done_cnt_flag == srv_buf_pool_instances); + + if ((comp_q->flag == Q_DONE) && + (done_cnt_flag == srv_buf_pool_instances)) { + break; + } + } while((comp_q->flag == Q_INITIALIZED) && + (done_cnt_flag != srv_buf_pool_instances)); + } else { + fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%d\n", + comp_q->flag, done_cnt_flag); + if (!done_cnt_flag) { + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + comp_q->flag = Q_INITIALIZED; + mutex_enter(&comp_q->mtx); + goto retry_submit; + ut_ad(!done_cnt_flag); + } + ut_ad(done_cnt_flag == srv_buf_pool_instances); + } + + mutex_exit(&comp_q->mtx); + mutex_enter(&wrk_q->mtx); + wrk_q->flag = Q_DONE; + mutex_exit(&wrk_q->mtx); + + return 0; +} + +/******************************************************************//** +@return a dummy parameter*/ +int +pgcomp_handler_init( + int num_threads, + int wrk_cnt, + opq_t *wq, + opq_t *cq) +/*================*/ +{ + int i=0; + + if(is_pgcomp_wrk_init_done()) { + fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n"); + return -1; + } + + if(!wq || !cq) { + fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq); + return -1; + } + + /* work-item setup */ + setup_wrk_itm(wrk_cnt); + + /* wq & cq setup */ + init_queue(wq); + init_queue(cq); + + /* Mark each of the thread sync entires */ + for(i=0; i < PGCOMP_MAX_WORKER; i++) { + pc_sync[i].wthread_id = i; + } + + /* Create threads for page-compression-flush */ + for(i=0; i < num_threads; i++) { + pc_sync[i].wthread_id = i; + pc_sync[i].wq = wq; + pc_sync[i].cq = cq; + os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)), + thread_ids + START_PGCOMP_CNT + i); + //pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i]; + pc_sync[i].wthread = (START_PGCOMP_CNT + i); + pc_sync[i].wt_status = WTHR_INITIALIZED; + } + + set_check_done_flag_count(wrk_cnt); + set_pgcomp_wrk_init_done(); + + return 0; +} + + +/*******************************************************************//** +Print work thread status information +@return why ? */ +int +wrk_thread_stat( + thread_sync_t *wthr, + unsigned int num_threads) +/*================*/ +{ + long stat_tot=0; + int i=0; + for(i=0; izip.data, bpage); + bpage->zip.data, bpage, 0); } else { ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE); *err = fil_io(OS_FILE_READ | wake_later | ignore_nonexistent_pages, sync, space, 0, offset, 0, UNIV_PAGE_SIZE, - ((buf_block_t*) bpage)->frame, bpage); + ((buf_block_t*) bpage)->frame, bpage, 0); } thd_wait_end(NULL); diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc index a560dc54eac..a382b211275 100644 --- a/storage/innobase/dict/dict0dict.cc +++ b/storage/innobase/dict/dict0dict.cc @@ -2,6 +2,7 @@ Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2012, Facebook Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -1446,8 +1447,8 @@ dict_table_rename_in_cache( ibool exists; char* filepath; - ut_ad(table->space != TRX_SYS_SPACE); - + ut_ad(table->space != TRX_SYS_SPACE); + if (DICT_TF_HAS_DATA_DIR(table->flags)) { dict_get_and_save_data_dir_path(table, true); @@ -1459,7 +1460,7 @@ dict_table_rename_in_cache( filepath = fil_make_ibd_name(table->name, false); } - fil_delete_tablespace(table->space, BUF_REMOVE_FLUSH_NO_WRITE); + fil_delete_tablespace(table->space, BUF_REMOVE_FLUSH_NO_WRITE); /* Delete any temp file hanging around. */ if (os_file_status(filepath, &exists, &type) diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc index 1779ae86c46..2bf5922e07d 100644 --- a/storage/innobase/fil/fil0fil.cc +++ b/storage/innobase/fil/fil0fil.cc @@ -1,6 +1,8 @@ /***************************************************************************** Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013 SkySQL Ab. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -24,6 +26,8 @@ Created 10/25/1995 Heikki Tuuri *******************************************************/ #include "fil0fil.h" +#include "fil0pagecompress.h" +#include "fsp0pagecompress.h" #include #include @@ -54,6 +58,14 @@ Created 10/25/1995 Heikki Tuuri # include "srv0srv.h" static ulint srv_data_read, srv_data_written; #endif /* !UNIV_HOTBACKUP */ +#include "zlib.h" +#ifdef __linux__ +#include +#include +#include +#include +#endif +#include "row0mysql.h" /* IMPLEMENTATION OF THE TABLESPACE MEMORY CACHE @@ -428,11 +440,16 @@ fil_read( block size multiple */ void* buf, /*!< in/out: buffer where to store data read; in aio this must be appropriately aligned */ - void* message) /*!< in: message for aio handler if non-sync + void* message, /*!< in: message for aio handler if non-sync aio used, else ignored */ + ulint write_size) /*!< in/out: Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ { return(fil_io(OS_FILE_READ, sync, space_id, zip_size, block_offset, - byte_offset, len, buf, message)); + byte_offset, len, buf, message, write_size)); } /********************************************************************//** @@ -457,18 +474,22 @@ fil_write( be a block size multiple */ void* buf, /*!< in: buffer from which to write; in aio this must be appropriately aligned */ - void* message) /*!< in: message for aio handler if non-sync + void* message, /*!< in: message for aio handler if non-sync aio used, else ignored */ + ulint write_size) /*!< in/out: Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ { ut_ad(!srv_read_only_mode); return(fil_io(OS_FILE_WRITE, sync, space_id, zip_size, block_offset, - byte_offset, len, buf, message)); + byte_offset, len, buf, message, write_size)); } /*******************************************************************//** Returns the table space by a given id, NULL if not found. */ -UNIV_INLINE fil_space_t* fil_space_get_by_id( /*================*/ @@ -486,6 +507,19 @@ fil_space_get_by_id( return(space); } +/****************************************************************//** +Get space id from fil node */ +ulint +fil_node_get_space_id( +/*==================*/ + fil_node_t* node) /*!< in: Compressed node*/ +{ + ut_ad(node); + ut_ad(node->space); + + return (node->space->id); +} + /*******************************************************************//** Returns the table space by a given name, NULL if not found. */ UNIV_INLINE @@ -704,8 +738,9 @@ fil_node_open_file( byte* buf2; byte* page; ulint space_id; - ulint flags; + ulint flags=0; ulint page_size; + ibool atomic_writes=FALSE; ut_ad(mutex_own(&(system->mutex))); ut_a(node->n_pending == 0); @@ -722,7 +757,7 @@ fil_node_open_file( node->handle = os_file_create_simple_no_error_handling( innodb_file_data_key, node->name, OS_FILE_OPEN, - OS_FILE_READ_ONLY, &success); + OS_FILE_READ_ONLY, &success, FALSE); if (!success) { /* The following call prints an error message */ os_file_get_last_error(true); @@ -774,6 +809,8 @@ fil_node_open_file( space_id = fsp_header_get_space_id(page); flags = fsp_header_get_flags(page); page_size = fsp_flags_get_page_size(flags); + atomic_writes = fsp_flags_get_atomic_writes(flags); + ut_free(buf2); @@ -824,6 +861,17 @@ fil_node_open_file( ut_error; } + if (UNIV_UNLIKELY(space->flags != flags)) { + if (!dict_tf_verify_flags(space->flags, flags)) { + fprintf(stderr, + "InnoDB: Error: table flags are 0x%lx" + " in the data dictionary\n" + "InnoDB: but the flags in file %s are 0x%lx!\n", + space->flags, node->name, flags); + ut_error; + } + } + if (size_bytes >= 1024 * 1024) { /* Truncate the size to whole megabytes. */ size_bytes = ut_2pow_round(size_bytes, 1024 * 1024); @@ -843,6 +891,8 @@ add_size: space->size += node->size; } + atomic_writes = fsp_flags_get_atomic_writes(space->flags); + /* printf("Opening file %s\n", node->name); */ /* Open the file for reading and writing, in Windows normally in the @@ -853,18 +903,18 @@ add_size: node->handle = os_file_create(innodb_file_log_key, node->name, OS_FILE_OPEN, OS_FILE_AIO, OS_LOG_FILE, - &ret); + &ret, atomic_writes); } else if (node->is_raw_disk) { node->handle = os_file_create(innodb_file_data_key, node->name, OS_FILE_OPEN_RAW, OS_FILE_AIO, OS_DATA_FILE, - &ret); + &ret, atomic_writes); } else { node->handle = os_file_create(innodb_file_data_key, node->name, OS_FILE_OPEN, OS_FILE_AIO, OS_DATA_FILE, - &ret); + &ret, atomic_writes); } ut_a(ret); @@ -1481,6 +1531,21 @@ fil_space_get_space( if (space->size == 0 && space->purpose == FIL_TABLESPACE) { ut_a(id != 0); + mutex_exit(&fil_system->mutex); + + /* It is possible that the space gets evicted at this point + before the fil_mutex_enter_and_prepare_for_io() acquires + the fil_system->mutex. Check for this after completing the + call to fil_mutex_enter_and_prepare_for_io(). */ + fil_mutex_enter_and_prepare_for_io(id); + + /* We are still holding the fil_system->mutex. Check if + the space is still in memory cache. */ + space = fil_space_get_by_id(id); + if (space == NULL) { + return(NULL); + } + /* The following code must change when InnoDB supports multiple datafiles per tablespace. */ ut_a(1 == UT_LIST_GET_LEN(space->chain)); @@ -1858,12 +1923,12 @@ fil_write_lsn_and_arch_no_to_file( buf = static_cast(ut_align(buf1, UNIV_PAGE_SIZE)); err = fil_read(TRUE, space, 0, sum_of_sizes, 0, - UNIV_PAGE_SIZE, buf, NULL); + UNIV_PAGE_SIZE, buf, NULL, 0); if (err == DB_SUCCESS) { mach_write_to_8(buf + FIL_PAGE_FILE_FLUSH_LSN, lsn); err = fil_write(TRUE, space, 0, sum_of_sizes, 0, - UNIV_PAGE_SIZE, buf, NULL); + UNIV_PAGE_SIZE, buf, NULL, 0); } mem_free(buf1); @@ -3095,7 +3160,7 @@ fil_create_link_file( file = os_file_create_simple_no_error_handling( innodb_file_data_key, link_filepath, - OS_FILE_CREATE, OS_FILE_READ_WRITE, &success); + OS_FILE_CREATE, OS_FILE_READ_WRITE, &success, FALSE); if (!success) { /* The following call will print an error message */ @@ -3111,10 +3176,10 @@ fil_create_link_file( ut_print_filename(stderr, filepath); fputs(" already exists.\n", stderr); err = DB_TABLESPACE_EXISTS; - } else if (error == OS_FILE_DISK_FULL) { err = DB_OUT_OF_FILE_SPACE; - + } else if (error == OS_FILE_OPERATION_NOT_SUPPORTED) { + err = DB_UNSUPPORTED; } else { err = DB_ERROR; } @@ -3204,8 +3269,9 @@ fil_open_linked_file( /*===============*/ const char* tablename, /*!< in: database/tablename */ char** remote_filepath,/*!< out: remote filepath */ - os_file_t* remote_file) /*!< out: remote file handle */ - + os_file_t* remote_file, /*!< out: remote file handle */ + ibool atomic_writes) /*!< in: should atomic writes be + used */ { ibool success; @@ -3219,7 +3285,7 @@ fil_open_linked_file( *remote_file = os_file_create_simple_no_error_handling( innodb_file_data_key, *remote_filepath, OS_FILE_OPEN, OS_FILE_READ_ONLY, - &success); + &success, atomic_writes); if (!success) { char* link_filepath = fil_make_isl_name(tablename); @@ -3274,6 +3340,7 @@ fil_create_new_single_table_tablespace( /* TRUE if a table is created with CREATE TEMPORARY TABLE */ bool is_temp = !!(flags2 & DICT_TF2_TEMPORARY); bool has_data_dir = FSP_FLAGS_HAS_DATA_DIR(flags); + bool atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(flags); ut_a(space_id > 0); ut_ad(!srv_read_only_mode); @@ -3306,7 +3373,8 @@ fil_create_new_single_table_tablespace( OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT, OS_FILE_NORMAL, OS_DATA_FILE, - &ret); + &ret, + atomic_writes); if (ret == FALSE) { /* The following call will print an error message */ @@ -3333,6 +3401,11 @@ fil_create_new_single_table_tablespace( goto error_exit_3; } + if (error == OS_FILE_OPERATION_NOT_SUPPORTED) { + err = DB_UNSUPPORTED; + goto error_exit_3; + } + if (error == OS_FILE_DISK_FULL) { err = DB_OUT_OF_FILE_SPACE; goto error_exit_3; @@ -3371,6 +3444,7 @@ fil_create_new_single_table_tablespace( flags = fsp_flags_set_page_size(flags, UNIV_PAGE_SIZE); fsp_header_init_fields(page, space_id, flags); mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, space_id); + ut_ad(fsp_flags_is_valid(flags)); if (!(fsp_flags_is_compressed(flags))) { buf_flush_init_for_writing(page, NULL, 0); @@ -3547,6 +3621,7 @@ fil_open_single_table_tablespace( fsp_open_info remote; ulint tablespaces_found = 0; ulint valid_tablespaces_found = 0; + ibool atomic_writes = FALSE; #ifdef UNIV_SYNC_DEBUG ut_ad(!fix_dict || rw_lock_own(&dict_operation_lock, RW_LOCK_EX)); @@ -3557,6 +3632,8 @@ fil_open_single_table_tablespace( return(DB_CORRUPTION); } + atomic_writes = fsp_flags_get_atomic_writes(flags); + /* If the tablespace was relocated, we do not compare the DATA_DIR flag */ ulint mod_flags = flags & ~FSP_FLAGS_MASK_DATA_DIR; @@ -3581,7 +3658,7 @@ fil_open_single_table_tablespace( } link_file_found = fil_open_linked_file( - tablename, &remote.filepath, &remote.file); + tablename, &remote.filepath, &remote.file, atomic_writes); remote.success = link_file_found; if (remote.success) { /* possibility of multiple files. */ @@ -3609,7 +3686,7 @@ fil_open_single_table_tablespace( if (dict.filepath) { dict.file = os_file_create_simple_no_error_handling( innodb_file_data_key, dict.filepath, OS_FILE_OPEN, - OS_FILE_READ_ONLY, &dict.success); + OS_FILE_READ_ONLY, &dict.success, atomic_writes); if (dict.success) { /* possibility of multiple files. */ validate = true; @@ -3621,7 +3698,7 @@ fil_open_single_table_tablespace( ut_a(def.filepath); def.file = os_file_create_simple_no_error_handling( innodb_file_data_key, def.filepath, OS_FILE_OPEN, - OS_FILE_READ_ONLY, &def.success); + OS_FILE_READ_ONLY, &def.success, atomic_writes); if (def.success) { tablespaces_found++; } @@ -4020,7 +4097,7 @@ fil_load_single_table_tablespace( /* Check for a link file which locates a remote tablespace. */ remote.success = fil_open_linked_file( - tablename, &remote.filepath, &remote.file); + tablename, &remote.filepath, &remote.file, FALSE); /* Read the first page of the remote tablespace */ if (remote.success) { @@ -4035,7 +4112,7 @@ fil_load_single_table_tablespace( /* Try to open the tablespace in the datadir. */ def.file = os_file_create_simple_no_error_handling( innodb_file_data_key, def.filepath, OS_FILE_OPEN, - OS_FILE_READ_ONLY, &def.success); + OS_FILE_READ_ONLY, &def.success, FALSE); /* Read the first page of the remote tablespace */ if (def.success) { @@ -4167,7 +4244,7 @@ will_not_choose: new_path = fil_make_ibbackup_old_name(fsp->filepath); bool success = os_file_rename( - innodb_file_data_key, fsp->filepath, new_path)); + innodb_file_data_key, fsp->filepath, new_path); ut_a(success); @@ -4821,7 +4898,7 @@ retry: success = os_aio(OS_FILE_WRITE, OS_AIO_SYNC, node->name, node->handle, buf, offset, page_size * n_pages, - NULL, NULL); + NULL, NULL, 0); #endif /* UNIV_HOTBACKUP */ if (success) { os_has_said_disk_full = FALSE; @@ -4852,6 +4929,7 @@ retry: space->size += pages_added; node->size += pages_added; + node->being_extended = FALSE; #ifdef HAVE_POSIX_FALLOCATE complete_io: @@ -4917,7 +4995,7 @@ fil_extend_tablespaces_to_stored_len(void) single-threaded operation */ error = fil_read(TRUE, space->id, fsp_flags_get_zip_size(space->flags), - 0, 0, UNIV_PAGE_SIZE, buf, NULL); + 0, 0, UNIV_PAGE_SIZE, buf, NULL, 0); ut_a(error == DB_SUCCESS); size_in_header = fsp_get_size_low(buf); @@ -5191,8 +5269,13 @@ fil_io( void* buf, /*!< in/out: buffer where to store read data or from where to write; in aio this must be appropriately aligned */ - void* message) /*!< in: message for aio handler if non-sync + void* message, /*!< in: message for aio handler if non-sync aio used, else ignored */ + ulint write_size) /*!< in/out: Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ { ulint mode; fil_space_t* space; @@ -5255,6 +5338,9 @@ fil_io( } else if (type == OS_FILE_WRITE) { ut_ad(!srv_read_only_mode); srv_stats.data_written.add(len); + if (fil_page_is_index_page((byte *)buf)) { + srv_stats.index_pages_written.inc(); + } } /* Reserve the fil_system mutex and make sure that we can open at @@ -5371,7 +5457,7 @@ fil_io( #else /* Queue the aio request */ ret = os_aio(type, mode | wake_later, node->name, node->handle, buf, - offset, len, node, message); + offset, len, node, message, write_size); #endif /* UNIV_HOTBACKUP */ ut_a(ret); @@ -5994,7 +6080,7 @@ fil_tablespace_iterate( file = os_file_create_simple_no_error_handling( innodb_file_data_key, filepath, - OS_FILE_OPEN, OS_FILE_READ_WRITE, &success); + OS_FILE_OPEN, OS_FILE_READ_WRITE, &success, FALSE); DBUG_EXECUTE_IF("fil_tablespace_iterate_failure", { @@ -6210,3 +6296,32 @@ fil_mtr_rename_log( mtr_commit(&mtr); } +/****************************************************************//** +Acquire fil_system mutex */ +void +fil_system_enter(void) +/*==================*/ +{ + ut_ad(!mutex_own(&fil_system->mutex)); + mutex_enter(&fil_system->mutex); +} + +/****************************************************************//** +Release fil_system mutex */ +void +fil_system_exit(void) +/*=================*/ +{ + ut_ad(mutex_own(&fil_system->mutex)); + mutex_exit(&fil_system->mutex); +} + +/*******************************************************************//** +Return space name */ +char* +fil_space_name( +/*===========*/ + fil_space_t* space) /*!< in: space */ +{ + return (space->name); +} diff --git a/storage/innobase/fil/fil0pagecompress.cc b/storage/innobase/fil/fil0pagecompress.cc new file mode 100644 index 00000000000..3926b23c677 --- /dev/null +++ b/storage/innobase/fil/fil0pagecompress.cc @@ -0,0 +1,369 @@ +/***************************************************************************** + +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file fil/fil0pagecompress.cc +Implementation for page compressed file spaces. + +Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +#include "fil0fil.h" +#include "fil0pagecompress.h" + +#include +#include + +#include "mem0mem.h" +#include "hash0hash.h" +#include "os0file.h" +#include "mach0data.h" +#include "buf0buf.h" +#include "buf0flu.h" +#include "log0recv.h" +#include "fsp0fsp.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "mtr0mtr.h" +#include "mtr0log.h" +#include "dict0dict.h" +#include "page0page.h" +#include "page0zip.h" +#include "trx0sys.h" +#include "row0mysql.h" +#ifndef UNIV_HOTBACKUP +# include "buf0lru.h" +# include "ibuf0ibuf.h" +# include "sync0sync.h" +# include "os0sync.h" +#else /* !UNIV_HOTBACKUP */ +# include "srv0srv.h" +static ulint srv_data_read, srv_data_written; +#endif /* !UNIV_HOTBACKUP */ +#include "zlib.h" +#ifdef __linux__ +#include +#include +#include +#include +#endif +#include "row0mysql.h" + +/****************************************************************//** +For page compressed pages compress the page before actual write +operation. +@return compressed page to be written*/ +byte* +fil_compress_page( +/*==============*/ + ulint space_id, /*!< in: tablespace id of the + table. */ + byte* buf, /*!< in: buffer from which to write; in aio + this must be appropriately aligned */ + byte* out_buf, /*!< out: compressed buffer */ + ulint len, /*!< in: length of input buffer.*/ + ulint* out_len) /*!< out: actual length of compressed page */ +{ + int err = Z_OK; + int level = 0; + ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; + ulint write_size=0; + + ut_a(buf); + ut_a(out_buf); + ut_a(len); + ut_a(out_len); + + level = fil_space_get_page_compression_level(space_id); + ut_a(fil_space_is_page_compressed(space_id)); + + fil_system_enter(); + fil_space_t* space = fil_space_get_by_id(space_id); + fil_system_exit(); + + /* If no compression level was provided to this table, use system + default level */ + if (level == 0) { + level = srv_compress_zlib_level; + } + +#ifdef UNIV_DEBUG + fprintf(stderr, + "InnoDB: Note: Preparing for compress for space %lu name %s len %lu\n", + space_id, fil_space_name(space), len); +#endif + + write_size = UNIV_PAGE_SIZE - header_len; + err = compress2(out_buf+header_len, &write_size, buf, len, level); + + if (err != Z_OK) { + /* If error we leave the actual page as it was */ + + fprintf(stderr, + "InnoDB: Warning: Compression failed for space %lu name %s len %lu rt %d write %lu\n", + space_id, fil_space_name(space), len, err, write_size); + + *out_len = len; + return (buf); + } else { + /* Set up the page header */ + memcpy(out_buf, buf, FIL_PAGE_DATA); + /* Set up the checksum */ + mach_write_to_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC); + /* Set up the correct page type */ + mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED); + /* Set up the flush lsn to be compression algorithm */ + mach_write_to_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN, FIL_PAGE_COMPRESSION_ZLIB); + /* Set up the actual payload lenght */ + mach_write_to_2(out_buf+FIL_PAGE_DATA, write_size); + +#ifdef UNIV_DEBUG + /* Verify */ + ut_ad(fil_page_is_compressed(out_buf)); + ut_ad(mach_read_from_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM) == BUF_NO_CHECKSUM_MAGIC); + ut_ad(mach_read_from_2(out_buf+FIL_PAGE_DATA) == write_size); + ut_ad(mach_read_from_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN) == FIL_PAGE_COMPRESSION_ZLIB); +#endif + + write_size+=header_len; + /* Actual write needs to be alligned on block size */ + if (write_size % OS_FILE_LOG_BLOCK_SIZE) { + write_size = (write_size + (OS_FILE_LOG_BLOCK_SIZE - (write_size % OS_FILE_LOG_BLOCK_SIZE))); + } + +#ifdef UNIV_DEBUG + fprintf(stderr, + "InnoDB: Note: Compression succeeded for space %lu name %s len %lu out_len %lu\n", + space_id, fil_space_name(space), len, write_size); +#endif +#define SECT_SIZE 512 + srv_stats.page_compression_saved.add((len - write_size)); + if ((len - write_size) > 0) { + srv_stats.page_compression_trim_sect512.add(((len - write_size) / SECT_SIZE)); + srv_stats.page_compression_trim_sect4096.add(((len - write_size) / (SECT_SIZE*8))); + } + //srv_stats.page_compressed_trim_op.inc(); + srv_stats.pages_page_compressed.inc(); + *out_len = write_size; + + return(out_buf); + } +} + +/****************************************************************//** +For page compressed pages decompress the page after actual read +operation. */ +void +fil_decompress_page( +/*================*/ + byte* page_buf, /*!< in: preallocated buffer or NULL */ + byte* buf, /*!< out: buffer from which to read; in aio + this must be appropriately aligned */ + ulint len) /*!< in: length of output buffer.*/ +{ + int err = 0; + ulint actual_size = 0; + ulint compression_alg = 0; + byte *in_buf; + + ut_a(buf); + ut_a(len); + + /* Before actual decompress, make sure that page type is correct */ + + if (mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM) != BUF_NO_CHECKSUM_MAGIC || + mach_read_from_2(buf+FIL_PAGE_TYPE) != FIL_PAGE_PAGE_COMPRESSED) { + fprintf(stderr, + "InnoDB: Corruption: We try to uncompress corrupted page\n" + "InnoDB: CRC %lu type %lu.\n" + "InnoDB: len %lu\n", + mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM), + mach_read_from_2(buf+FIL_PAGE_TYPE), len); + + fflush(stderr); + ut_error; + } + + /* Get compression algorithm */ + compression_alg = mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN); + + if (compression_alg == FIL_PAGE_COMPRESSION_ZLIB) { + // If no buffer was given, we need to allocate temporal buffer + if (page_buf == NULL) { + in_buf = static_cast(ut_malloc(UNIV_PAGE_SIZE)); + } else { + in_buf = page_buf; + } + + /* Get the actual size of compressed page */ + actual_size = mach_read_from_2(buf+FIL_PAGE_DATA); + +#ifdef UNIV_DEBUG + fprintf(stderr, + "InnoDB: Note: Preparing for decompress for len %lu\n", + actual_size); +#endif + + err= uncompress(in_buf, &len, buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE, (unsigned long)actual_size); + + + /* If uncompress fails it means that page is corrupted */ + if (err != Z_OK) { + + fprintf(stderr, + "InnoDB: Corruption: Page is marked as compressed\n" + "InnoDB: but uncompress failed with error %d.\n" + "InnoDB: size %lu len %lu\n", + err, actual_size, len); + + fflush(stderr); + + ut_error; + } + +#ifdef UNIV_DEBUG + fprintf(stderr, + "InnoDB: Note: Decompression succeeded for len %lu \n", + len); +#endif + + /* Copy the uncompressed page to the buffer pool, not + really any other options. */ + memcpy(buf, in_buf, len); + + // Need to free temporal buffer if no buffer was given + if (page_buf == NULL) { + ut_free(in_buf); + } + + srv_stats.pages_page_decompressed.inc(); + } else { + fprintf(stderr, + "InnoDB: Corruption: Page is marked as compressed\n" + "InnoDB: but compression algorithm %s\n" + "InnoDB: is not known.\n" + ,fil_get_compression_alg_name(compression_alg)); + + fflush(stderr); + ut_error; + } +} + +/*******************************************************************//** +Find out wheather the page is index page or not +@return true if page type index page, false if not */ +ibool +fil_page_is_index_page( +/*===================*/ + byte *buf) /*!< in: page */ +{ + return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_INDEX); +} + +/*******************************************************************//** +Find out wheather the page is page compressed +@return true if page is page compressed, false if not */ +ibool +fil_page_is_compressed( +/*===================*/ + byte *buf) /*!< in: page */ +{ + return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED); +} + +/*******************************************************************//** +Returns the page compression level of the space, or 0 if the space +is not compressed. The tablespace must be cached in the memory cache. +@return page compression level, ULINT_UNDEFINED if space not found */ +ulint +fil_space_get_page_compression_level( +/*=================================*/ + ulint id) /*!< in: space id */ +{ + ulint flags; + + flags = fil_space_get_flags(id); + + if (flags && flags != ULINT_UNDEFINED) { + + return(fsp_flags_get_page_compression_level(flags)); + } + + return(flags); +} + +/*******************************************************************//** +Extract the page compression from space. +@return true if space is page compressed, false if space is not found +or space is not page compressed. */ +ibool +fil_space_is_page_compressed( +/*=========================*/ + ulint id) /*!< in: space id */ +{ + ulint flags; + + flags = fil_space_get_flags(id); + + if (flags && flags != ULINT_UNDEFINED) { + + return(fsp_flags_is_page_compressed(flags)); + } + + return(flags); +} + +/****************************************************************//** +Get the name of the compression algorithm used for page +compression. +@return compression algorithm name or "UNKNOWN" if not known*/ +const char* +fil_get_compression_alg_name( +/*=========================*/ + ulint comp_alg) /*!tablefile_extensions = ha_innobase_exts; + innobase_hton->table_options = innodb_table_option_list; + ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR); #ifndef DBUG_OFF @@ -3118,8 +3161,6 @@ innobase_change_buffering_inited_ok: srv_use_doublewrite_buf = (ibool) innobase_use_doublewrite; - page_compression_level = (ulint) innobase_compression_level; - if (!innobase_use_checksums) { ut_print_timestamp(stderr); fprintf(stderr, @@ -9465,11 +9506,16 @@ innobase_table_flags( enum row_type row_format; rec_format_t innodb_row_format = REC_FORMAT_COMPACT; bool use_data_dir; + ha_table_option_struct *options= form->s->option_struct; /* Cache the value of innodb_file_format, in case it is modified by another thread while the table is being created. */ const ulint file_format_allowed = srv_file_format; + /* Cache the value of innobase_compression_level, in case it is + modified by another thread while the table is being created. */ + const ulint default_compression_level = innobase_compression_level; + *flags = 0; *flags2 = 0; @@ -9513,6 +9559,8 @@ index_bad: } } + row_format = form->s->row_type; + if (create_info->key_block_size) { /* The requested compressed page size (key_block_size) is given in kilobytes. If it is a valid number, store @@ -9522,7 +9570,7 @@ index_bad: ulint kbsize; /* Key Block Size */ for (zssize = kbsize = 1; zssize <= ut_min(UNIV_PAGE_SSIZE_MAX, - PAGE_ZIP_SSIZE_MAX); + PAGE_ZIP_SSIZE_MAX); zssize++, kbsize <<= 1) { if (kbsize == create_info->key_block_size) { zip_ssize = zssize; @@ -9550,8 +9598,8 @@ index_bad: } if (!zip_allowed - || zssize > ut_min(UNIV_PAGE_SSIZE_MAX, - PAGE_ZIP_SSIZE_MAX)) { + || zssize > ut_min(UNIV_PAGE_SSIZE_MAX, + PAGE_ZIP_SSIZE_MAX)) { push_warning_printf( thd, Sql_condition::WARN_LEVEL_WARN, ER_ILLEGAL_HA_CREATE_OPTION, @@ -9560,8 +9608,6 @@ index_bad: } } - row_format = form->s->row_type; - if (zip_ssize && zip_allowed) { /* if ROW_FORMAT is set to default, automatically change it to COMPRESSED.*/ @@ -9598,7 +9644,6 @@ index_bad: case ROW_TYPE_REDUNDANT: innodb_row_format = REC_FORMAT_REDUNDANT; break; - case ROW_TYPE_COMPRESSED: case ROW_TYPE_DYNAMIC: if (!use_tablespace) { @@ -9616,10 +9661,18 @@ index_bad: " innodb_file_format > Antelope.", get_row_format_name(row_format)); } else { - innodb_row_format = (row_format == ROW_TYPE_DYNAMIC - ? REC_FORMAT_DYNAMIC - : REC_FORMAT_COMPRESSED); - break; + switch(row_format) { + case ROW_TYPE_COMPRESSED: + innodb_row_format = REC_FORMAT_COMPRESSED; + break; + case ROW_TYPE_DYNAMIC: + innodb_row_format = REC_FORMAT_DYNAMIC; + break; + default: + /* Not possible, avoid compiler warning */ + break; + } + break; /* Correct row_format */ } zip_allowed = FALSE; /* fall through to set row_format = COMPACT */ @@ -9646,7 +9699,15 @@ index_bad: && ((create_info->data_file_name != NULL) && !(create_info->options & HA_LEX_CREATE_TMP_TABLE)); - dict_tf_set(flags, innodb_row_format, zip_ssize, use_data_dir); + /* Set up table dictionary flags */ + dict_tf_set(flags, + innodb_row_format, + zip_ssize, + use_data_dir, + options->page_compressed, + (ulint)options->page_compression_level == ULINT_UNDEFINED ? + default_compression_level : options->page_compression_level, + options->atomic_writes); if (create_info->options & HA_LEX_CREATE_TMP_TABLE) { *flags2 |= DICT_TF2_TEMPORARY; @@ -9659,6 +9720,111 @@ index_bad: DBUG_RETURN(true); } + +/*****************************************************************//** +Check engine specific table options not handled by SQL-parser. +@return NULL if valid, string if not */ +UNIV_INTERN +const char* +ha_innobase::check_table_options( + THD *thd, /*!< in: thread handle */ + TABLE* table, /*!< in: information on table + columns and indexes */ + HA_CREATE_INFO* create_info, /*!< in: more information of the + created table, contains also the + create statement string */ + const bool use_tablespace, /*!< in: use file par table */ + const ulint file_format) +{ + enum row_type row_format = table->s->row_type;; + ha_table_option_struct *options= table->s->option_struct; + + /* Check page compression requirements */ + if (options->page_compressed) { + if (!srv_compress_pages) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: PAGE_COMPRESSED requires" + "innodb_compress_pages not enabled"); + return "PAGE_COMPRESSED"; + } + + if (row_format == ROW_TYPE_COMPRESSED) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: PAGE_COMPRESSED table can't have" + " ROW_TYPE=COMPRESSED"); + return "PAGE_COMPRESSED"; + } + + if (!use_tablespace) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: PAGE_COMPRESSED requires" + " innodb_file_per_table."); + return "PAGE_COMPRESSED"; + } + + if (file_format < UNIV_FORMAT_B) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: PAGE_COMPRESSED requires" + " innodb_file_format > Antelope."); + return "PAGE_COMPRESSED"; + } + + if (create_info->key_block_size) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: PAGE_COMPRESSED table can't have" + " key_block_size"); + return "PAGE_COMPRESSED"; + } + } + + /* Check page compression level requirements, some of them are + already checked above */ + if ((ulint)options->page_compression_level != ULINT_UNDEFINED) { + if (options->page_compressed == false) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: PAGE_COMPRESSION_LEVEL requires" + " PAGE_COMPRESSED"); + return "PAGE_COMPRESSION_LEVEL"; + } + + if (options->page_compression_level < 0 || options->page_compression_level > 9) { + push_warning_printf( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: invalid PAGE_COMPRESSION_LEVEL = %lu." + " Valid values are [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]", + create_info->key_block_size); + return "PAGE_COMPRESSION_LEVEL"; + } + } + + /* Check atomic writes requirements */ + if (options->atomic_writes) { + if (!srv_use_atomic_writes && !use_tablespace) { + push_warning( + thd, Sql_condition::WARN_LEVEL_WARN, + HA_WRONG_CREATE_OPTION, + "InnoDB: ATOMIC_WRITES requires" + " innodb_file_per_table."); + return "ATOMIC_WRITES"; + } + } + + return 0; +} + /*****************************************************************//** Creates a new table to an InnoDB database. @return error number */ @@ -9690,6 +9856,7 @@ ha_innobase::create( while creating the table. So we read the current value here and make all further decisions based on this. */ bool use_tablespace = srv_file_per_table; + const ulint file_format = srv_file_format; /* Zip Shift Size - log2 - 9 of compressed page size, zero for uncompressed */ @@ -9713,6 +9880,12 @@ ha_innobase::create( /* Create the table definition in InnoDB */ + /* Validate table options not handled by the SQL-parser */ + if(check_table_options(thd, form, create_info, use_tablespace, + file_format)) { + DBUG_RETURN(HA_WRONG_CREATE_OPTION); + } + /* Validate create options if innodb_strict_mode is set. */ if (create_options_are_invalid( thd, form, create_info, use_tablespace)) { @@ -13952,6 +14125,12 @@ ha_innobase::check_if_incompatible_data( HA_CREATE_INFO* info, uint table_changes) { + ha_table_option_struct *param_old, *param_new; + + /* Cache engine specific options */ + param_new = info->option_struct; + param_old = table->s->option_struct; + innobase_copy_frm_flags_from_create_info(prebuilt->table, info); if (table_changes != IS_EQUAL_YES) { @@ -13978,6 +14157,13 @@ ha_innobase::check_if_incompatible_data( return(COMPATIBLE_DATA_NO); } + /* Changes on engine specific table options requests a rebuild of the table. */ + if (param_new->page_compressed != param_old->page_compressed || + param_new->page_compression_level != param_old->page_compression_level || + param_new->atomic_writes != param_old->atomic_writes) { + return(COMPATIBLE_DATA_NO); + } + return(COMPATIBLE_DATA_YES); } @@ -16447,6 +16633,31 @@ static MYSQL_SYSVAR_BOOL(trx_purge_view_update_only_debug, NULL, NULL, FALSE); #endif /* UNIV_DEBUG */ +static MYSQL_SYSVAR_BOOL(compress_pages, srv_compress_pages, + PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY, + "Use page compression.", + NULL, NULL, FALSE); + +static MYSQL_SYSVAR_LONG(trim_pct, srv_trim_pct, + PLUGIN_VAR_OPCMDARG , + "How many percent of compressed pages should be trimmed", + NULL, NULL, 100, 0, 100, 0); + +static MYSQL_SYSVAR_LONG(compress_zlib_level, srv_compress_zlib_level, + PLUGIN_VAR_OPCMDARG , + "Default zlib compression level", + NULL, NULL, 6, 0, 9, 0); + +static MYSQL_SYSVAR_BOOL(compress_index_pages, srv_page_compress_index_pages, + PLUGIN_VAR_OPCMDARG, + "Use page compression for only index pages.", + NULL, NULL, FALSE); + +static MYSQL_SYSVAR_BOOL(use_trim, srv_use_trim, + PLUGIN_VAR_OPCMDARG, + "Use trim.", + NULL, NULL, TRUE); + static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(additional_mem_pool_size), MYSQL_SYSVAR(api_trx_level), @@ -16592,6 +16803,11 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(limit_optimistic_insert_debug), MYSQL_SYSVAR(trx_purge_view_update_only_debug), #endif /* UNIV_DEBUG */ + MYSQL_SYSVAR(compress_pages), + MYSQL_SYSVAR(trim_pct), + MYSQL_SYSVAR(compress_zlib_level), + MYSQL_SYSVAR(compress_index_pages), + MYSQL_SYSVAR(use_trim), NULL }; diff --git a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h index ece9f7cf58a..5eb460072bb 100644 --- a/storage/innobase/handler/ha_innodb.h +++ b/storage/innobase/handler/ha_innodb.h @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 2000, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -56,6 +57,18 @@ typedef struct st_innobase_share { /** Prebuilt structures in an InnoDB table handle used within MySQL */ struct row_prebuilt_t; +/** Engine specific table options are definined using this struct */ +struct ha_table_option_struct +{ + bool page_compressed; /*!< Table is using page compression + if this option is true. */ + int page_compression_level; /*!< Table page compression level + or UNIV_UNSPECIFIED. */ + bool atomic_writes; /*!< Use atomic writes for this + table if this options is true. */ +}; + + /** The class defining a handle to an Innodb table */ class ha_innobase: public handler { @@ -182,6 +195,8 @@ class ha_innobase: public handler char* norm_name, char* temp_path, char* remote_path); + const char* check_table_options(THD *thd, TABLE* table, + HA_CREATE_INFO* create_info, const bool use_tablespace, const ulint file_format); int create(const char *name, register TABLE *form, HA_CREATE_INFO *create_info); int truncate(); diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc index a120534b36d..49f8a05d11a 100644 --- a/storage/innobase/handler/handler0alter.cc +++ b/storage/innobase/handler/handler0alter.cc @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 2005, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -248,6 +249,22 @@ ha_innobase::check_if_supported_inplace_alter( update_thd(); trx_search_latch_release_if_reserved(prebuilt->trx); + /* Change on engine specific table options require rebuild of the + table */ + if (ha_alter_info->handler_flags + == Alter_inplace_info::CHANGE_CREATE_OPTION) { + ha_table_option_struct *new_options= ha_alter_info->create_info->option_struct; + ha_table_option_struct *old_options= table->s->option_struct; + + if (new_options->page_compressed != old_options->page_compressed || + new_options->page_compression_level != old_options->page_compression_level || + new_options->atomic_writes != old_options->page_compression_level) { + ha_alter_info->unsupported_reason = innobase_get_err_msg( + ER_ALTER_OPERATION_NOT_SUPPORTED_REASON); + DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); + } + } + if (ha_alter_info->handler_flags & ~(INNOBASE_ONLINE_OPERATIONS | INNOBASE_INPLACE_REBUILD)) { if (ha_alter_info->handler_flags @@ -3331,6 +3348,17 @@ ha_innobase::prepare_inplace_alter_table( if (ha_alter_info->handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) { + /* Check engine specific table options */ + if (const char* invalid_tbopt = check_table_options( + user_thd, altered_table, + ha_alter_info->create_info, + prebuilt->table->space != 0, + srv_file_format)) { + my_error(ER_ILLEGAL_HA_CREATE_OPTION, MYF(0), + table_type(), invalid_tbopt); + goto err_exit_no_heap; + } + if (const char* invalid_opt = create_options_are_invalid( user_thd, altered_table, ha_alter_info->create_info, diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index 74a6e203808..5e301a27e32 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -1470,6 +1471,11 @@ struct buf_page_t{ state == BUF_BLOCK_ZIP_PAGE and zip.data == NULL means an active buf_pool->watch */ + + ulint write_size; /* Write size is set when this + page is first time written and then + if written again we check is TRIM + operation needed. */ #ifndef UNIV_HOTBACKUP buf_page_t* hash; /*!< node used in chaining to buf_pool->page_hash or diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h index af0a5b31cc4..0ca64956a2e 100644 --- a/storage/innobase/include/dict0dict.h +++ b/storage/innobase/include/dict0dict.h @@ -2,6 +2,7 @@ Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2012, Facebook Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -42,6 +43,8 @@ Created 1/8/1996 Heikki Tuuri #include "ut0byte.h" #include "trx0types.h" #include "row0types.h" +#include "fsp0fsp.h" +#include "dict0pagecompress.h" #ifndef UNIV_HOTBACKUP # include "sync0sync.h" @@ -878,7 +881,14 @@ dict_tf_set( ulint* flags, /*!< in/out: table */ rec_format_t format, /*!< in: file format */ ulint zip_ssize, /*!< in: zip shift size */ - bool remote_path) /*!< in: table uses DATA DIRECTORY */ + bool remote_path, /*!< in: table uses DATA DIRECTORY + */ + bool page_compressed,/*!< in: table uses page compressed + pages */ + ulint page_compression_level, /*!< in: table page compression + level */ + bool atomic_writes) /*!< in: table uses atomic + writes */ __attribute__((nonnull)); /********************************************************************//** Convert a 32 bit integer table flags to the 32 bit integer that is @@ -906,6 +916,7 @@ dict_tf_get_zip_size( /*=================*/ ulint flags) /*!< in: flags */ __attribute__((const)); + /********************************************************************//** Check whether the table uses the compressed compact page format. @return compressed page size, or 0 if not compressed */ @@ -1779,6 +1790,7 @@ dict_tf_to_row_format_string( #endif /* !UNIV_HOTBACKUP */ + #ifndef UNIV_NONINL #include "dict0dict.ic" #endif diff --git a/storage/innobase/include/dict0dict.ic b/storage/innobase/include/dict0dict.ic index 83953c9325a..65967552b87 100644 --- a/storage/innobase/include/dict0dict.ic +++ b/storage/innobase/include/dict0dict.ic @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -537,9 +538,25 @@ dict_tf_is_valid( ulint zip_ssize = DICT_TF_GET_ZIP_SSIZE(flags); ulint atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(flags); ulint unused = DICT_TF_GET_UNUSED(flags); + ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(flags); + ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(flags); + ulint data_dir = DICT_TF_HAS_DATA_DIR(flags); + ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(flags); /* Make sure there are no bits that we do not know about. */ if (unused != 0) { + fprintf(stderr, + "InnoDB: Error: table unused flags are %ld" + " in the data dictionary and are corrupted\n" + "InnoDB: Error: data dictionary flags are\n" + "InnoDB: compact %ld atomic_blobs %ld\n" + "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n" + "InnoDB: page_compression %ld page_compression_level %ld\n" + "InnoDB: atomic_writes %ld\n", + unused, + compact, atomic_blobs, unused, data_dir, zip_ssize, + page_compression, page_compression_level, atomic_writes + ); return(false); @@ -550,12 +567,34 @@ dict_tf_is_valid( data stored off-page in the clustered index. */ if (!compact) { + fprintf(stderr, + "InnoDB: Error: table compact flags are %ld" + " in the data dictionary and are corrupted\n" + "InnoDB: Error: data dictionary flags are\n" + "InnoDB: compact %ld atomic_blobs %ld\n" + "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n" + "InnoDB: page_compression %ld page_compression_level %ld\n" + "InnoDB: atomic_writes %ld\n", + compact, compact, atomic_blobs, unused, data_dir, zip_ssize, + page_compression, page_compression_level, atomic_writes + ); return(false); } } else if (zip_ssize) { /* Antelope does not support COMPRESSED row format. */ + fprintf(stderr, + "InnoDB: Error: table flags are %ld" + " in the data dictionary and are corrupted\n" + "InnoDB: Error: data dictionary flags are\n" + "InnoDB: compact %ld atomic_blobs %ld\n" + "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n" + "InnoDB: page_compression %ld page_compression_level %ld\n" + "InnoDB: atomic_writes %ld\n", + flags, compact, atomic_blobs, unused, data_dir, zip_ssize, + page_compression, page_compression_level, atomic_writes + ); return(false); } @@ -568,6 +607,40 @@ dict_tf_is_valid( || !atomic_blobs || zip_ssize > PAGE_ZIP_SSIZE_MAX) { + fprintf(stderr, + "InnoDB: Error: table compact flags are %ld in the data dictionary and are corrupted\n" + "InnoDB: Error: data dictionary flags are\n" + "InnoDB: compact %ld atomic_blobs %ld\n" + "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n" + "InnoDB: page_compression %ld page_compression_level %ld\n" + "InnoDB: atomic_writes %ld\n", + flags, + compact, atomic_blobs, unused, data_dir, zip_ssize, + page_compression, page_compression_level, atomic_writes + + ); + return(false); + } + } + + if (page_compression || page_compression_level) { + /* Page compression format must have compact and + atomic_blobs and page_compression_level requires + page_compression */ + if (!compact + || !page_compression + || !atomic_blobs) { + + fprintf(stderr, + "InnoDB: Error: table flags are %ld in the data dictionary and are corrupted\n" + "InnoDB: Error: data dictionary flags are\n" + "InnoDB: compact %ld atomic_blobs %ld\n" + "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n" + "InnoDB: page_compression %ld page_compression_level %ld\n" + "InnoDB: atomic_writes %ld\n", + flags, compact, atomic_blobs, unused, data_dir, zip_ssize, + page_compression, page_compression_level, atomic_writes + ); return(false); } } @@ -594,6 +667,9 @@ dict_sys_tables_type_validate( ulint zip_ssize = DICT_TF_GET_ZIP_SSIZE(type); ulint atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(type); ulint unused = DICT_TF_GET_UNUSED(type); + ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(type); + ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type); + ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(type); /* The low order bit of SYS_TABLES.TYPE is always set to 1. If the format is UNIV_FORMAT_B or higher, this field is the same @@ -647,6 +723,23 @@ dict_sys_tables_type_validate( format, so the DATA_DIR flag is compatible with any other table flags. However, it is not used with TEMPORARY tables.*/ + if (page_compression || page_compression_level) { + /* page compressed row format must have low_order_bit and + atomic_blobs bits set and the DICT_N_COLS_COMPACT flag + should be in N_COLS, but we already know about the + low_order_bit and DICT_N_COLS_COMPACT flags. */ + + if (!atomic_blobs || !page_compression) { + return(ULINT_UNDEFINED); + } + } + + if (atomic_writes) { + if (!atomic_blobs) { + return(ULINT_UNDEFINED); + } + } + /* Return the validated SYS_TABLES.TYPE. */ return(type); } @@ -719,7 +812,14 @@ dict_tf_set( ulint* flags, /*!< in/out: table flags */ rec_format_t format, /*!< in: file format */ ulint zip_ssize, /*!< in: zip shift size */ - bool use_data_dir) /*!< in: table uses DATA DIRECTORY */ + bool use_data_dir, /*!< in: table uses DATA DIRECTORY + */ + bool page_compressed,/*!< in: table uses page compressed + pages */ + ulint page_compression_level, /*!< in: table page compression + level */ + bool atomic_writes) /*!< in: table uses atomic + writes */ { switch (format) { case REC_FORMAT_REDUNDANT: @@ -742,6 +842,22 @@ dict_tf_set( break; } + if (page_compressed) { + *flags = DICT_TF_COMPACT + | (1 << DICT_TF_POS_ATOMIC_BLOBS) + | (1 << DICT_TF_POS_PAGE_COMPRESSION) + | (page_compression_level << DICT_TF_POS_PAGE_COMPRESSION_LEVEL); + + ut_ad(zip_ssize == 0); + ut_ad(dict_tf_get_page_compression(*flags) == TRUE); + ut_ad(dict_tf_get_page_compression_level(*flags) == page_compression_level); + } + + if (atomic_writes) { + *flags |= (1 << DICT_TF_POS_ATOMIC_WRITES); + ut_ad(dict_tf_get_atomic_writes(*flags) == TRUE); + } + if (use_data_dir) { *flags |= (1 << DICT_TF_POS_DATA_DIR); } @@ -765,6 +881,9 @@ dict_tf_to_fsp_flags( ulint table_flags) /*!< in: dict_table_t::flags */ { ulint fsp_flags; + ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(table_flags); + ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(table_flags); + ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(table_flags); DBUG_EXECUTE_IF("dict_tf_to_fsp_flags_failure", return(ULINT_UNDEFINED);); @@ -783,7 +902,20 @@ dict_tf_to_fsp_flags( fsp_flags |= DICT_TF_HAS_DATA_DIR(table_flags) ? FSP_FLAGS_MASK_DATA_DIR : 0; + /* In addition, tablespace flags also contain if the page + compression is used for this table. */ + fsp_flags |= FSP_FLAGS_SET_PAGE_COMPRESSION(fsp_flags, page_compression); + + /* In addition, tablespace flags also contain page compression level + if page compression is used for this table. */ + fsp_flags |= FSP_FLAGS_SET_PAGE_COMPRESSION_LEVEL(fsp_flags, page_compression_level); + + /* In addition, tablespace flags also contain flag if atomic writes + is used for this table */ + fsp_flags |= FSP_FLAGS_SET_ATOMIC_WRITES(fsp_flags, atomic_writes); + ut_a(fsp_flags_is_valid(fsp_flags)); + ut_a(dict_tf_verify_flags(table_flags, fsp_flags)); return(fsp_flags); } @@ -811,10 +943,15 @@ dict_sys_tables_type_to_tf( /* Adjust bit zero. */ flags = redundant ? 0 : 1; - /* ZIP_SSIZE, ATOMIC_BLOBS & DATA_DIR are the same. */ + /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION, + PAGE_COMPRESSION_LEVEL, ATOMIC_WRITES are the same. */ flags |= type & (DICT_TF_MASK_ZIP_SSIZE | DICT_TF_MASK_ATOMIC_BLOBS - | DICT_TF_MASK_DATA_DIR); + | DICT_TF_MASK_DATA_DIR + | DICT_TF_MASK_PAGE_COMPRESSION + | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL + | DICT_TF_MASK_ATOMIC_WRITES + ); return(flags); } @@ -842,10 +979,14 @@ dict_tf_to_sys_tables_type( /* Adjust bit zero. It is always 1 in SYS_TABLES.TYPE */ type = 1; - /* ZIP_SSIZE, ATOMIC_BLOBS & DATA_DIR are the same. */ + /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION, + PAGE_COMPRESSION_LEVEL, ATOMIC_WRITES are the same. */ type |= flags & (DICT_TF_MASK_ZIP_SSIZE | DICT_TF_MASK_ATOMIC_BLOBS - | DICT_TF_MASK_DATA_DIR); + | DICT_TF_MASK_DATA_DIR + | DICT_TF_MASK_PAGE_COMPRESSION + | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL + | DICT_TF_MASK_ATOMIC_WRITES); return(type); } diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h index 671f67eb1f8..6cfcb81bcd5 100644 --- a/storage/innobase/include/dict0mem.h +++ b/storage/innobase/include/dict0mem.h @@ -2,6 +2,7 @@ Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2012, Facebook Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -120,11 +121,25 @@ This flag prevents older engines from attempting to open the table and allows InnoDB to update_create_info() accordingly. */ #define DICT_TF_WIDTH_DATA_DIR 1 +/** +Width of the page compression flag +*/ +#define DICT_TF_WIDTH_PAGE_COMPRESSION 1 +#define DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL 4 + +/** +Width of atomic writes flag +*/ +#define DICT_TF_WIDTH_ATOMIC_WRITES 1 + /** Width of all the currently known table flags */ #define DICT_TF_BITS (DICT_TF_WIDTH_COMPACT \ + DICT_TF_WIDTH_ZIP_SSIZE \ + DICT_TF_WIDTH_ATOMIC_BLOBS \ - + DICT_TF_WIDTH_DATA_DIR) + + DICT_TF_WIDTH_DATA_DIR \ + + DICT_TF_WIDTH_PAGE_COMPRESSION \ + + DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL \ + + DICT_TF_WIDTH_ATOMIC_WRITES) /** A mask of all the known/used bits in table flags */ #define DICT_TF_BIT_MASK (~(~0 << DICT_TF_BITS)) @@ -140,9 +155,19 @@ allows InnoDB to update_create_info() accordingly. */ /** Zero relative shift position of the DATA_DIR field */ #define DICT_TF_POS_DATA_DIR (DICT_TF_POS_ATOMIC_BLOBS \ + DICT_TF_WIDTH_ATOMIC_BLOBS) +/** Zero relative shift position of the PAGE_COMPRESSION field */ +#define DICT_TF_POS_PAGE_COMPRESSION (DICT_TF_POS_DATA_DIR \ + + DICT_TF_WIDTH_DATA_DIR) +/** Zero relative shift position of the PAGE_COMPRESSION_LEVEL field */ +#define DICT_TF_POS_PAGE_COMPRESSION_LEVEL (DICT_TF_POS_PAGE_COMPRESSION \ + + DICT_TF_WIDTH_PAGE_COMPRESSION) +/** Zero relative shift position of the ATOMIC_WRITES field */ +#define DICT_TF_POS_ATOMIC_WRITES (DICT_TF_POS_PAGE_COMPRESSION_LEVEL \ + + DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL) + /** Zero relative shift position of the start of the UNUSED bits */ -#define DICT_TF_POS_UNUSED (DICT_TF_POS_DATA_DIR \ - + DICT_TF_WIDTH_DATA_DIR) +#define DICT_TF_POS_UNUSED (DICT_TF_POS_ATOMIC_WRITES \ + + DICT_TF_WIDTH_ATOMIC_WRITES) /** Bit mask of the COMPACT field */ #define DICT_TF_MASK_COMPACT \ @@ -160,6 +185,18 @@ allows InnoDB to update_create_info() accordingly. */ #define DICT_TF_MASK_DATA_DIR \ ((~(~0 << DICT_TF_WIDTH_DATA_DIR)) \ << DICT_TF_POS_DATA_DIR) +/** Bit mask of the PAGE_COMPRESSION field */ +#define DICT_TF_MASK_PAGE_COMPRESSION \ + ((~(~0 << DICT_TF_WIDTH_PAGE_COMPRESSION)) \ + << DICT_TF_POS_PAGE_COMPRESSION) +/** Bit mask of the PAGE_COMPRESSION_LEVEL field */ +#define DICT_TF_MASK_PAGE_COMPRESSION_LEVEL \ + ((~(~0 << DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL)) \ + << DICT_TF_POS_PAGE_COMPRESSION_LEVEL) +/** Bit mask of the ATOMIC_WRITES field */ +#define DICT_TF_MASK_ATOMIC_WRITES \ + ((~(~0 << DICT_TF_WIDTH_ATOMIC_WRITES)) \ + << DICT_TF_POS_ATOMIC_WRITES) /** Return the value of the COMPACT field */ #define DICT_TF_GET_COMPACT(flags) \ @@ -177,6 +214,19 @@ allows InnoDB to update_create_info() accordingly. */ #define DICT_TF_HAS_DATA_DIR(flags) \ ((flags & DICT_TF_MASK_DATA_DIR) \ >> DICT_TF_POS_DATA_DIR) +/** Return the value of the PAGE_COMPRESSION field */ +#define DICT_TF_GET_PAGE_COMPRESSION(flags) \ + ((flags & DICT_TF_MASK_PAGE_COMPRESSION) \ + >> DICT_TF_POS_PAGE_COMPRESSION) +/** Return the value of the PAGE_COMPRESSION_LEVEL field */ +#define DICT_TF_GET_PAGE_COMPRESSION_LEVEL(flags) \ + ((flags & DICT_TF_MASK_PAGE_COMPRESSION_LEVEL) \ + >> DICT_TF_POS_PAGE_COMPRESSION_LEVEL) +/** Return the value of the ATOMIC_WRITES field */ +#define DICT_TF_GET_ATOMIC_WRITES(flags) \ + ((flags & DICT_TF_MASK_ATOMIC_WRITES) \ + >> DICT_TF_POS_ATOMIC_WRITES) + /** Return the contents of the UNUSED bits */ #define DICT_TF_GET_UNUSED(flags) \ (flags >> DICT_TF_POS_UNUSED) diff --git a/storage/innobase/include/dict0pagecompress.h b/storage/innobase/include/dict0pagecompress.h new file mode 100644 index 00000000000..236924758f1 --- /dev/null +++ b/storage/innobase/include/dict0pagecompress.h @@ -0,0 +1,94 @@ +/***************************************************************************** + +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file include/dict0pagecompress.h +Helper functions for extracting/storing page compression information +to dictionary. + +Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +#ifndef dict0pagecompress_h +#define dict0pagecompress_h + +/********************************************************************//** +Extract the page compression level from table flags. +@return page compression level, or 0 if not compressed */ +UNIV_INLINE +ulint +dict_tf_get_page_compression_level( +/*===============================*/ + ulint flags) /*!< in: flags */ + __attribute__((const)); +/********************************************************************//** +Extract the page compression flag from table flags +@return page compression flag, or false if not compressed */ +UNIV_INLINE +ibool +dict_tf_get_page_compression( +/*==========================*/ + ulint flags) /*!< in: flags */ + __attribute__((const)); + +/********************************************************************//** +Check whether the table uses the page compressed page format. +@return page compression level, or 0 if not compressed */ +UNIV_INLINE +ulint +dict_table_page_compression_level( +/*==============================*/ + const dict_table_t* table) /*!< in: table */ + __attribute__((const)); + +/********************************************************************//** +Verify that dictionary flags match tablespace flags +@return true if flags match, false if not */ +UNIV_INLINE +ibool +dict_tf_verify_flags( +/*=================*/ + ulint table_flags, /*!< in: dict_table_t::flags */ + ulint fsp_flags) /*!< in: fil_space_t::flags */ + __attribute__((const)); + +/********************************************************************//** +Extract the atomic writes flag from table flags. +@return true if atomic writes are used, false if not used */ +UNIV_INLINE +ibool +dict_tf_get_atomic_writes( +/*======================*/ + ulint flags) /*!< in: flags */ + __attribute__((const)); + +/********************************************************************//** +Check whether the table uses the atomic writes. +@return true if atomic writes is used, false if not */ +UNIV_INLINE +ibool +dict_table_get_atomic_writes( +/*=========================*/ + const dict_table_t* table); /*!< in: table */ + + +#ifndef UNIV_NONINL +#include "dict0pagecompress.ic" +#endif + +#endif diff --git a/storage/innobase/include/dict0pagecompress.ic b/storage/innobase/include/dict0pagecompress.ic new file mode 100644 index 00000000000..98b64723542 --- /dev/null +++ b/storage/innobase/include/dict0pagecompress.ic @@ -0,0 +1,191 @@ +/***************************************************************************** + +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file include/dict0pagecompress.ic +Inline implementation for helper functions for extracting/storing +page compression and atomic writes information to dictionary. + +Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +/********************************************************************//** +Verify that dictionary flags match tablespace flags +@return true if flags match, false if not */ +UNIV_INLINE +ibool +dict_tf_verify_flags( +/*=================*/ + ulint table_flags, /*!< in: dict_table_t::flags */ + ulint fsp_flags) /*!< in: fil_space_t::flags */ +{ + ulint table_unused = DICT_TF_GET_UNUSED(table_flags); + ulint compact = DICT_TF_GET_COMPACT(table_flags); + ulint ssize = DICT_TF_GET_ZIP_SSIZE(table_flags); + ulint atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(table_flags); + ulint data_dir = DICT_TF_HAS_DATA_DIR(table_flags); + ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(table_flags); + ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(table_flags); + ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(table_flags); + ulint post_antelope = FSP_FLAGS_GET_POST_ANTELOPE(fsp_flags); + ulint zip_ssize = FSP_FLAGS_GET_ZIP_SSIZE(fsp_flags); + ulint fsp_atomic_blobs = FSP_FLAGS_HAS_ATOMIC_BLOBS(fsp_flags); + ulint page_ssize = FSP_FLAGS_GET_PAGE_SSIZE(fsp_flags); + ulint fsp_unused = FSP_FLAGS_GET_UNUSED(fsp_flags); + ulint fsp_page_compression = FSP_FLAGS_GET_PAGE_COMPRESSION(fsp_flags); + ulint fsp_page_compression_level = FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(fsp_flags); + ulint fsp_atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(fsp_flags); + + DBUG_EXECUTE_IF("dict_tf_verify_flags_failure", + return(ULINT_UNDEFINED);); + + ut_ad(!table_unused); + ut_ad(!fsp_unused); + ut_ad(page_ssize == 0 || page_ssize != 0); /* silence compiler */ + ut_ad(compact == 0 || compact == 1); /* silence compiler */ + ut_ad(data_dir == 0 || data_dir == 1); /* silence compiler */ + ut_ad(post_antelope == 0 || post_antelope == 1); /* silence compiler */ + + if (ssize != zip_ssize) { + fprintf(stderr, + "InnoDB: Error: table flags has zip_ssize %ld" + " in the data dictionary\n" + "InnoDB: but the flags in file has zip_ssize %ld\n", + ssize, zip_ssize); + return (FALSE); + } + if (atomic_blobs != fsp_atomic_blobs) { + fprintf(stderr, + "InnoDB: Error: table flags has atomic_blobs %ld" + " in the data dictionary\n" + "InnoDB: but the flags in file has atomic_blobs %ld\n", + atomic_blobs, fsp_atomic_blobs); + + return (FALSE); + } + if (page_compression != fsp_page_compression) { + fprintf(stderr, + "InnoDB: Error: table flags has page_compression %ld" + " in the data dictionary\n" + "InnoDB: but the flags in file ahas page_compression %ld\n", + page_compression, fsp_page_compression); + + return (FALSE); + } + if (page_compression_level != fsp_page_compression_level) { + fprintf(stderr, + "InnoDB: Error: table flags has page_compression_level %ld" + " in the data dictionary\n" + "InnoDB: but the flags in file has page_compression_level %ld\n", + page_compression_level, fsp_page_compression_level); + + return (FALSE); + } + + if (atomic_writes != fsp_atomic_writes) { + fprintf(stderr, + "InnoDB: Error: table flags has atomic writes %ld" + " in the data dictionary\n" + "InnoDB: but the flags in file has atomic_writes %ld\n", + atomic_writes, fsp_atomic_writes); + + return (FALSE); + } + + return(TRUE); +} + +/********************************************************************//** +Extract the page compression level from dict_table_t::flags. +These flags are in memory, so assert that they are valid. +@return page compression level, or 0 if not compressed */ +UNIV_INLINE +ulint +dict_tf_get_page_compression_level( +/*===============================*/ + ulint flags) /*!< in: flags */ +{ + ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(flags); + + ut_ad(page_compression_level >= 0 && page_compression_level <= 9); + + return(page_compression_level); +} + +/********************************************************************//** +Check whether the table uses the page compression page format. +@return page compression level, or 0 if not compressed */ +UNIV_INLINE +ulint +dict_table_page_compression_level( +/*==============================*/ + const dict_table_t* table) /*!< in: table */ +{ + ut_ad(table); + ut_ad(dict_tf_get_page_compression(table->flags)); + + return(dict_tf_get_page_compression_level(table->flags)); +} + +/********************************************************************//** +Check whether the table uses the page compression page format. +@return true if page compressed, false if not */ +UNIV_INLINE +ibool +dict_tf_get_page_compression( +/*=========================*/ + ulint flags) /*!< in: flags */ +{ + return(DICT_TF_GET_PAGE_COMPRESSION(flags)); +} + +/********************************************************************//** +Check whether the table uses the page compression page format. +@return true if page compressed, false if not */ +UNIV_INLINE +ibool +dict_table_is_page_compressed( +/*==========================*/ + const dict_table_t* table) /*!< in: table */ +{ + return (dict_tf_get_page_compression(table->flags)); +} + +/********************************************************************//** +Extract the atomic writes flag from table flags. +@return true if atomic writes are used, false if not used */ +UNIV_INLINE +ibool +dict_tf_get_atomic_writes( +/*======================*/ + ulint flags) /*!< in: flags */ +{ + return(DICT_TF_GET_ATOMIC_WRITES(flags)); +} + +/********************************************************************//** +Check whether the table uses the atomic writes. +@return true if atomic writes is used, false if not */ +UNIV_INLINE +ibool +dict_table_get_atomic_writes( +/*=========================*/ + const dict_table_t* table) /*!< in: table */ +{ + return (dict_tf_get_atomic_writes(table->flags)); +} diff --git a/storage/innobase/include/fil0fil.h b/storage/innobase/include/fil0fil.h index 56fda8b39b1..c5edd33f46b 100644 --- a/storage/innobase/include/fil0fil.h +++ b/storage/innobase/include/fil0fil.h @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -128,6 +129,12 @@ extern fil_addr_t fil_addr_null; #define FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID 34 /*!< starting from 4.1.x this contains the space id of the page */ #define FIL_PAGE_DATA 38 /*!< start of the data on the page */ +/* Following are used when page compression is used */ +#define FIL_PAGE_COMPRESSED_SIZE 2 /*!< Number of bytes used to store + actual payload data size on + compressed pages. */ +#define FIL_PAGE_COMPRESSION_ZLIB 1 /*!< Compressin algorithm ZLIB. */ + /* @} */ /** File page trailer @{ */ #define FIL_PAGE_END_LSN_OLD_CHKSUM 8 /*!< the low 4 bytes of this are used @@ -140,6 +147,7 @@ extern fil_addr_t fil_addr_null; #ifndef UNIV_INNOCHECKSUM /** File page types (values of FIL_PAGE_TYPE) @{ */ +#define FIL_PAGE_PAGE_COMPRESSED 34354 /*!< page compressed page */ #define FIL_PAGE_INDEX 17855 /*!< B-tree node */ #define FIL_PAGE_UNDO_LOG 2 /*!< Undo log page */ #define FIL_PAGE_INODE 3 /*!< Index node */ @@ -202,6 +210,7 @@ ulint fil_space_get_type( /*===============*/ ulint id); /*!< in: space id */ + #endif /* !UNIV_HOTBACKUP */ /*******************************************************************//** Appends a new file to the chain of files of a space. File must be closed. @@ -742,8 +751,13 @@ fil_io( void* buf, /*!< in/out: buffer where to store read data or from where to write; in aio this must be appropriately aligned */ - void* message) /*!< in: message for aio handler if non-sync + void* message, /*!< in: message for aio handler if non-sync aio used, else ignored */ + ulint write_size) /*!< in/out: Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ __attribute__((nonnull(8))); /**********************************************************************//** Waits for an aio operation to complete. This function is used to write the @@ -977,8 +991,33 @@ fil_mtr_rename_log( ulint new_space_id, /*!< in: tablespace id of the new table */ const char* new_name, /*!< in: new table name */ - const char* tmp_name); /*!< in: temp table name used while + const char* tmp_name) /*!< in: temp table name used while swapping */ + __attribute__((nonnull)); #endif /* !UNIV_INNOCHECKSUM */ + +/****************************************************************//** +Acquire fil_system mutex */ +void +fil_system_enter(void); +/*==================*/ +/****************************************************************//** +Release fil_system mutex */ +void +fil_system_exit(void); +/*==================*/ +/*******************************************************************//** +Returns the table space by a given id, NULL if not found. */ +fil_space_t* +fil_space_get_by_id( +/*================*/ + ulint id); /*!< in: space id */ +/*******************************************************************//** +Return space name */ +char* +fil_space_name( +/*===========*/ + fil_space_t* space); /*!< in: space */ + #endif /* fil0fil_h */ diff --git a/storage/innobase/include/fil0pagecompress.h b/storage/innobase/include/fil0pagecompress.h new file mode 100644 index 00000000000..e21eae7a5ee --- /dev/null +++ b/storage/innobase/include/fil0pagecompress.h @@ -0,0 +1,117 @@ +/***************************************************************************** + +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +#ifndef fil0pagecompress_h +#define fil0pagecompress_h + +#include "fsp0fsp.h" +#include "fsp0pagecompress.h" + +/******************************************************************//** +@file include/fil0pagecompress.h +Helper functions for extracting/storing page compression and +atomic writes information to table space. + +Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +/*******************************************************************//** +Returns the page compression level flag of the space, or 0 if the space +is not compressed. The tablespace must be cached in the memory cache. +@return page compression level if page compressed, ULINT_UNDEFINED if space not found */ +ulint +fil_space_get_page_compression_level( +/*=================================*/ + ulint id); /*!< in: space id */ +/*******************************************************************//** +Returns the page compression flag of the space, or false if the space +is not compressed. The tablespace must be cached in the memory cache. +@return true if page compressed, false if not or space not found */ +ibool +fil_space_is_page_compressed( +/*=========================*/ + ulint id); /*!< in: space id */ +/*******************************************************************//** +Returns the atomic writes flag of the space, or false if the space +is not using atomic writes. The tablespace must be cached in the memory cache. +@return true if space using atomic writes, false if not */ +ibool +fil_space_get_atomic_writes( +/*=========================*/ + ulint id); /*!< in: space id */ +/*******************************************************************//** +Find out wheather the page is index page or not +@return true if page type index page, false if not */ +ibool +fil_page_is_index_page( +/*===================*/ + byte *buf); /*!< in: page */ + +/****************************************************************//** +Get the name of the compression algorithm used for page +compression. +@return compression algorithm name or "UNKNOWN" if not known*/ +const char* +fil_get_compression_alg_name( +/*=========================*/ + ulint comp_alg); /*!> FSP_FLAGS_POS_UNUSED) +/** Return the value of the PAGE_COMPRESSION field */ +#define FSP_FLAGS_GET_PAGE_COMPRESSION(flags) \ + ((flags & FSP_FLAGS_MASK_PAGE_COMPRESSION) \ + >> FSP_FLAGS_POS_PAGE_COMPRESSION) +/** Return the value of the PAGE_COMPRESSION_LEVEL field */ +#define FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags) \ + ((flags & FSP_FLAGS_MASK_PAGE_COMPRESSION_LEVEL) \ + >> FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL) +/** Return the value of the ATOMIC_WRITES field */ +#define FSP_FLAGS_GET_ATOMIC_WRITES(flags) \ + ((flags & FSP_FLAGS_MASK_ATOMIC_WRITES) \ + >> FSP_FLAGS_POS_ATOMIC_WRITES) + /** Set a PAGE_SSIZE into the correct bits in a given tablespace flags. */ #define FSP_FLAGS_SET_PAGE_SSIZE(flags, ssize) \ (flags | (ssize << FSP_FLAGS_POS_PAGE_SSIZE)) +/** Set a PAGE_COMPRESSION into the correct bits in a given +tablespace flags. */ +#define FSP_FLAGS_SET_PAGE_COMPRESSION(flags, compression) \ + (flags | (compression << FSP_FLAGS_POS_PAGE_COMPRESSION)) + +/** Set a PAGE_COMPRESSION_LEVEL into the correct bits in a given +tablespace flags. */ +#define FSP_FLAGS_SET_PAGE_COMPRESSION_LEVEL(flags, level) \ + (flags | (level << FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL)) +/** Set a ATOMIC_WRITES into the correct bits in a given +tablespace flags. */ +#define FSP_FLAGS_SET_ATOMIC_WRITES(flags, atomics) \ + (flags | (atomics << FSP_FLAGS_POS_ATOMIC_WRITES)) + /* @} */ /* @defgroup Tablespace Header Constants (moved from fsp0fsp.c) @{ */ diff --git a/storage/innobase/include/fsp0fsp.ic b/storage/innobase/include/fsp0fsp.ic index 0d81e817cc9..0ca02a5652d 100644 --- a/storage/innobase/include/fsp0fsp.ic +++ b/storage/innobase/include/fsp0fsp.ic @@ -1,6 +1,7 @@ /***************************************************************************** Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -63,6 +64,9 @@ fsp_flags_is_valid( ulint atomic_blobs = FSP_FLAGS_HAS_ATOMIC_BLOBS(flags); ulint page_ssize = FSP_FLAGS_GET_PAGE_SSIZE(flags); ulint unused = FSP_FLAGS_GET_UNUSED(flags); + ulint page_compression = FSP_FLAGS_GET_PAGE_COMPRESSION(flags); + ulint page_compression_level = FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags); + ulint atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(flags); DBUG_EXECUTE_IF("fsp_flags_is_valid_failure", return(false);); @@ -104,6 +108,18 @@ fsp_flags_is_valid( return(false); } + /* Page compression level requires page compression and atomic blobs + to be set */ + if (page_compression_level || page_compression) { + if (!page_compression || !atomic_blobs) { + return(false); + } + } + + if (atomic_writes && !atomic_blobs) { + return (false); + } + #if UNIV_FORMAT_MAX != UNIV_FORMAT_B # error "UNIV_FORMAT_MAX != UNIV_FORMAT_B, Add more validations." #endif @@ -312,3 +328,4 @@ xdes_calc_descriptor_page( } #endif /* !UNIV_INNOCHECKSUM */ + diff --git a/storage/innobase/include/fsp0pagecompress.h b/storage/innobase/include/fsp0pagecompress.h new file mode 100644 index 00000000000..417d4a6879e --- /dev/null +++ b/storage/innobase/include/fsp0pagecompress.h @@ -0,0 +1,64 @@ +/***************************************************************************** + +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file include/fsp0pagecompress.h +Helper functions for extracting/storing page compression and +atomic writes information to file space. + +Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +#ifndef fsp0pagecompress_h +#define fsp0pagecompress_h + +/**********************************************************************//** +Reads the page compression level from the first page of a tablespace. +@return page compression level, or 0 if uncompressed */ +UNIV_INTERN +ulint +fsp_header_get_compression_level( +/*=============================*/ + const page_t* page); /*!< in: first page of a tablespace */ + +/********************************************************************//** +Determine if the tablespace is page compressed from dict_table_t::flags. +@return TRUE if page compressed, FALSE if not compressed */ +UNIV_INLINE +ibool +fsp_flags_is_page_compressed( +/*=========================*/ + ulint flags); /*!< in: tablespace flags */ + +/********************************************************************//** +Extract the page compression level from tablespace flags. +A tablespace has only one physical page compression level +whether that page is compressed or not. +@return page compression level of the file-per-table tablespace, +or zero if the table is not compressed. */ +UNIV_INLINE +ulint +fsp_flags_get_page_compression_level( +/*=================================*/ + ulint flags); /*!< in: tablespace flags */ + +#ifndef UNIV_NONINL +#include "fsp0pagecompress.ic" +#endif + +#endif diff --git a/storage/innobase/include/fsp0pagecompress.ic b/storage/innobase/include/fsp0pagecompress.ic new file mode 100644 index 00000000000..1dffd1bedf1 --- /dev/null +++ b/storage/innobase/include/fsp0pagecompress.ic @@ -0,0 +1,61 @@ +/***************************************************************************** + +Copyright (C) 2013 SkySQL Ab. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*****************************************************************************/ + +/******************************************************************//** +@file include/fsp0pagecompress.ic +Implementation for helper functions for extracting/storing page +compression and atomic writes information to file space. + +Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com +***********************************************************************/ + +/********************************************************************//** +Determine if the tablespace is page compressed from dict_table_t::flags. +@return TRUE if page compressed, FALSE if not page compressed */ +UNIV_INLINE +ibool +fsp_flags_is_page_compressed( +/*=========================*/ + ulint flags) /*!< in: tablespace flags */ +{ + return(FSP_FLAGS_GET_PAGE_COMPRESSION(flags)); +} + +/********************************************************************//** +Determine the tablespace is page compression level from dict_table_t::flags. +@return page compression level or 0 if not compressed*/ +UNIV_INLINE +ulint +fsp_flags_get_page_compression_level( +/*=================================*/ + ulint flags) /*!< in: tablespace flags */ +{ + return(FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags)); +} + +/********************************************************************//** +Determine the tablespace is using atomic writes from dict_table_t::flags. +@return true if atomic writes is used, false if not */ +UNIV_INLINE +ibool +fsp_flags_get_atomic_writes( +/*========================*/ + ulint flags) /*!< in: tablespace flags */ +{ + return(FSP_FLAGS_GET_ATOMIC_WRITES(flags)); +} diff --git a/storage/innobase/include/fsp0types.h b/storage/innobase/include/fsp0types.h index 94fd908ab0c..e5c1734b842 100644 --- a/storage/innobase/include/fsp0types.h +++ b/storage/innobase/include/fsp0types.h @@ -29,6 +29,7 @@ Created May 26, 2009 Vasil Dimov #include "univ.i" #include "fil0fil.h" /* for FIL_PAGE_DATA */ +#include "ut0byte.h" /** @name Flags for inserting records in order If records are inserted in order, there are the following diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h index 4a744c1b268..3c70f9925fe 100644 --- a/storage/innobase/include/os0file.h +++ b/storage/innobase/include/os0file.h @@ -2,6 +2,7 @@ Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2009, Percona Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Percona Inc.. Those modifications are @@ -150,6 +151,7 @@ enum os_file_create_t { #define OS_FILE_INSUFFICIENT_RESOURCE 78 #define OS_FILE_AIO_INTERRUPTED 79 #define OS_FILE_OPERATION_ABORTED 80 +#define OS_FILE_OPERATION_NOT_SUPPORTED 125 /* @} */ /** Types for aio operations @{ */ @@ -269,26 +271,26 @@ os_file_write The wrapper functions have the prefix of "innodb_". */ #ifdef UNIV_PFS_IO -# define os_file_create(key, name, create, purpose, type, success) \ +# define os_file_create(key, name, create, purpose, type, success, atomic_writes) \ pfs_os_file_create_func(key, name, create, purpose, type, \ - success, __FILE__, __LINE__) + success, atomic_writes, __FILE__, __LINE__) -# define os_file_create_simple(key, name, create, access, success) \ +# define os_file_create_simple(key, name, create, access, success, atomic_writes) \ pfs_os_file_create_simple_func(key, name, create, access, \ - success, __FILE__, __LINE__) + success, atomic_writes, __FILE__, __LINE__) # define os_file_create_simple_no_error_handling( \ - key, name, create_mode, access, success) \ + key, name, create_mode, access, success, atomic_writes) \ pfs_os_file_create_simple_no_error_handling_func( \ - key, name, create_mode, access, success, __FILE__, __LINE__) + key, name, create_mode, access, success, atomic_writes, __FILE__, __LINE__) # define os_file_close(file) \ pfs_os_file_close_func(file, __FILE__, __LINE__) # define os_aio(type, mode, name, file, buf, offset, \ - n, message1, message2) \ + n, message1, message2, write_size) \ pfs_os_aio_func(type, mode, name, file, buf, offset, \ - n, message1, message2, __FILE__, __LINE__) + n, message1, message2, write_size, __FILE__, __LINE__) # define os_file_read(file, buf, offset, n) \ pfs_os_file_read_func(file, buf, offset, n, __FILE__, __LINE__) @@ -310,22 +312,22 @@ The wrapper functions have the prefix of "innodb_". */ /* If UNIV_PFS_IO is not defined, these I/O APIs point to original un-instrumented file I/O APIs */ -# define os_file_create(key, name, create, purpose, type, success) \ - os_file_create_func(name, create, purpose, type, success) +# define os_file_create(key, name, create, purpose, type, success, atomic_writes) \ + os_file_create_func(name, create, purpose, type, success, atomic_writes) -# define os_file_create_simple(key, name, create_mode, access, success) \ - os_file_create_simple_func(name, create_mode, access, success) +# define os_file_create_simple(key, name, create_mode, access, success, atomic_writes) \ + os_file_create_simple_func(name, create_mode, access, success, atomic_writes) # define os_file_create_simple_no_error_handling( \ - key, name, create_mode, access, success) \ - os_file_create_simple_no_error_handling_func( \ - name, create_mode, access, success) + key, name, create_mode, access, success, atomic_writes) \ + os_file_create_simple_no_error_handling_func( \ + name, create_mode, access, success, atomic_writes) # define os_file_close(file) os_file_close_func(file) -# define os_aio(type, mode, name, file, buf, offset, n, message1, message2) \ +# define os_aio(type, mode, name, file, buf, offset, n, message1, message2, write_size) \ os_aio_func(type, mode, name, file, buf, offset, n, \ - message1, message2) + message1, message2, write_size) # define os_file_read(file, buf, offset, n) \ os_file_read_func(file, buf, offset, n) @@ -468,7 +470,8 @@ os_file_create_simple_func( ulint create_mode,/*!< in: create mode */ ulint access_type,/*!< in: OS_FILE_READ_ONLY or OS_FILE_READ_WRITE */ - ibool* success);/*!< out: TRUE if succeed, FALSE if error */ + ibool* success,/*!< out: TRUE if succeed, FALSE if error */ + ibool atomic_writes); /*!space_id, 0, (ulint) (next_offset / UNIV_PAGE_SIZE), (ulint) (next_offset % UNIV_PAGE_SIZE), write_len, buf, - group); + group, 0); srv_stats.os_log_pending_writes.dec(); @@ -1859,7 +1860,7 @@ log_group_checkpoint( write_offset / UNIV_PAGE_SIZE, write_offset % UNIV_PAGE_SIZE, OS_FILE_LOG_BLOCK_SIZE, - buf, ((byte*) group + 1)); + buf, ((byte*) group + 1), 0); ut_ad(((ulint) group & 0x1UL) == 0); } @@ -1939,7 +1940,7 @@ log_group_read_checkpoint_info( fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->space_id, 0, field / UNIV_PAGE_SIZE, field % UNIV_PAGE_SIZE, - OS_FILE_LOG_BLOCK_SIZE, log_sys->checkpoint_buf, NULL); + OS_FILE_LOG_BLOCK_SIZE, log_sys->checkpoint_buf, NULL, 0); } /******************************************************//** @@ -2233,7 +2234,7 @@ loop: fil_io(OS_FILE_READ | OS_FILE_LOG, sync, group->space_id, 0, (ulint) (source_offset / UNIV_PAGE_SIZE), (ulint) (source_offset % UNIV_PAGE_SIZE), - len, buf, NULL); + len, buf, NULL, 0); start_lsn += len; buf += len; @@ -2298,7 +2299,7 @@ log_group_archive_file_header_write( dest_offset / UNIV_PAGE_SIZE, dest_offset % UNIV_PAGE_SIZE, 2 * OS_FILE_LOG_BLOCK_SIZE, - buf, &log_archive_io); + buf, &log_archive_io, 0); } /******************************************************//** @@ -2334,7 +2335,7 @@ log_group_archive_completed_header_write( dest_offset % UNIV_PAGE_SIZE, OS_FILE_LOG_BLOCK_SIZE, buf + LOG_FILE_ARCH_COMPLETED, - &log_archive_io); + &log_archive_io, 0); } /******************************************************//** @@ -2462,7 +2463,7 @@ loop: (ulint) (next_offset / UNIV_PAGE_SIZE), (ulint) (next_offset % UNIV_PAGE_SIZE), ut_calc_align(len, OS_FILE_LOG_BLOCK_SIZE), buf, - &log_archive_io); + &log_archive_io, 0); start_lsn += len; next_offset += len; diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index 8cefa9e4b70..a3df6a8d5bd 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -2,6 +2,7 @@ Copyright (c) 1997, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2012, Facebook Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -2063,7 +2064,7 @@ recv_apply_log_recs_for_backup(void) error = fil_io(OS_FILE_READ, TRUE, recv_addr->space, zip_size, recv_addr->page_no, 0, zip_size, - block->page.zip.data, NULL); + block->page.zip.data, NULL, 0); if (error == DB_SUCCESS && !buf_zip_decompress(block, TRUE)) { exit(1); @@ -2073,7 +2074,7 @@ recv_apply_log_recs_for_backup(void) recv_addr->space, 0, recv_addr->page_no, 0, UNIV_PAGE_SIZE, - block->frame, NULL); + block->frame, NULL, 0); } if (error != DB_SUCCESS) { @@ -2102,13 +2103,13 @@ recv_apply_log_recs_for_backup(void) recv_addr->space, zip_size, recv_addr->page_no, 0, zip_size, - block->page.zip.data, NULL); + block->page.zip.data, NULL, 0); } else { error = fil_io(OS_FILE_WRITE, TRUE, recv_addr->space, 0, recv_addr->page_no, 0, UNIV_PAGE_SIZE, - block->frame, NULL); + block->frame, NULL, 0); } skip_this_recv_addr: recv_addr = HASH_GET_NEXT(addr_hash, recv_addr); @@ -3074,7 +3075,7 @@ recv_recovery_from_checkpoint_start_func( fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, max_cp_group->space_id, 0, 0, 0, LOG_FILE_HDR_SIZE, - log_hdr_buf, max_cp_group); + log_hdr_buf, max_cp_group, 0); if (0 == ut_memcmp(log_hdr_buf + LOG_FILE_WAS_CREATED_BY_HOT_BACKUP, (byte*)"ibbackup", (sizeof "ibbackup") - 1)) { @@ -3105,7 +3106,7 @@ recv_recovery_from_checkpoint_start_func( fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE, max_cp_group->space_id, 0, 0, 0, OS_FILE_LOG_BLOCK_SIZE, - log_hdr_buf, max_cp_group); + log_hdr_buf, max_cp_group, 0); } #ifdef UNIV_LOG_ARCHIVE @@ -3753,8 +3754,8 @@ ask_again: #endif /* Read the archive file header */ - fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->archive_space_id, 0, 0, - LOG_FILE_HDR_SIZE, buf, NULL); + fil_io(OS_FILE_READ | OS_FILE_LOG, true, group->archive_space_id, 0, 0, + LOG_FILE_HDR_SIZE, buf, NULL, 0); /* Check if the archive file header is consistent */ @@ -3827,7 +3828,7 @@ ask_again: fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->archive_space_id, read_offset / UNIV_PAGE_SIZE, - read_offset % UNIV_PAGE_SIZE, len, buf, NULL); + read_offset % UNIV_PAGE_SIZE, len, buf, NULL, 0); ret = recv_scan_log_recs( (buf_pool_get_n_pages() diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index d1b2b12bf59..60331f9c483 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -2,6 +2,7 @@ Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2009, Percona Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Percona Inc.. Those modifications are @@ -42,8 +43,14 @@ Created 10/21/1995 Heikki Tuuri #include "srv0srv.h" #include "srv0start.h" #include "fil0fil.h" +#include "fil0pagecompress.h" #include "buf0buf.h" #include "srv0mon.h" +#include "srv0srv.h" +#ifdef HAVE_POSIX_FALLOCATE +#include "fcntl.h" +#include "linux/falloc.h" +#endif #ifndef UNIV_HOTBACKUP # include "os0sync.h" # include "os0thread.h" @@ -60,6 +67,13 @@ Created 10/21/1995 Heikki Tuuri #include #endif +#if defined(UNIV_LINUX) && defined(HAVE_SYS_IOCTL_H) +# include +# ifndef DFS_IOCTL_ATOMIC_WRITE_SET +# define DFS_IOCTL_ATOMIC_WRITE_SET _IOW(0x95, 2, uint) +# endif +#endif + /** Insert buffer segment id */ static const ulint IO_IBUF_SEGMENT = 0; @@ -175,6 +189,25 @@ struct os_aio_slot_t{ and which can be used to identify which pending aio operation was completed */ + ulint bitmap; + + byte* page_compression_page; /*!< Memory allocated for + page compressed page and + freed after the write + has been completed */ + + ulint write_size; /*!< Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ + + byte* page_buf; /*!< Actual page buffer for + page compressed pages, do not + free this */ + + ibool page_compress_success; + #ifdef WIN_ASYNC_IO HANDLE handle; /*!< handle object we need in the OVERLAPPED struct */ @@ -294,6 +327,79 @@ UNIV_INTERN ulint os_n_pending_writes = 0; /** Number of pending read operations */ UNIV_INTERN ulint os_n_pending_reads = 0; +/** After first fallocate failure we will disable os_file_trim */ +UNIV_INTERN ibool os_fallocate_failed = FALSE; + +/**********************************************************************//** +Directly manipulate the allocated disk space by deallocating for the file referred to +by fd for the byte range starting at offset and continuing for len bytes. +Within the specified range, partial file system blocks are zeroed, and whole +file system blocks are removed from the file. After a successful call, +subsequent reads from this range will return zeroes. +@return true if success, false if error */ +UNIV_INTERN +ibool +os_file_trim( +/*=========*/ + os_file_t file, /*!< in: file to be trimmed */ + os_aio_slot_t* slot, /*!< in: slot structure */ + ulint len); /*!< in: length of area */ + +/**********************************************************************//** +Allocate memory for temporal buffer used for page compression. This +buffer is freed later. */ +UNIV_INTERN +void +os_slot_alloc_page_buf( +/*===================*/ + os_aio_slot_t* slot); /*!< in: slot structure */ + +/****************************************************************//** +Does error handling when a file operation fails. +@return TRUE if we should retry the operation */ +static +ibool +os_file_handle_error_no_exit( +/*=========================*/ + const char* name, /*!< in: name of a file or NULL */ + const char* operation, /*!< in: operation */ + ibool on_error_silent,/*!< in: if TRUE then don't print + any message to the log. */ + const char* file, /*!< in: file name */ + const ulint line); /*!< in: line */ + +/****************************************************************//** +Tries to enable the atomic write feature, if available, for the specified file +handle. +@return TRUE if success */ +static __attribute__((warn_unused_result)) +ibool +os_file_set_atomic_writes( +/*======================*/ + const char* name /*!< in: name of the file */ + __attribute__((unused)), + os_file_t file /*!< in: handle to the file */ + __attribute__((unused))) +{ +#ifdef DFS_IOCTL_ATOMIC_WRITE_SET + int atomic_option = 1; + + if (ioctl(file, DFS_IOCTL_ATOMIC_WRITE_SET, &atomic_option)) { + + os_file_handle_error_no_exit(name, "ioctl", FALSE, __FILE__, __LINE__); + return(FALSE); + } + + return(TRUE); +#else + fprintf(stderr, "InnoDB: Error: trying to enable atomic writes on " + "non-supported platform! Please restart with " + "innodb_use_atomic_writes disabled.\n"); + return(FALSE); +#endif +} + + #ifdef UNIV_DEBUG # ifndef UNIV_HOTBACKUP /**********************************************************************//** @@ -498,7 +604,17 @@ os_file_get_last_error_low( fprintf(stderr, "InnoDB: The error means mysqld does not have" " the access rights to\n" - "InnoDB: the directory.\n"); + "InnoDECANCELEDB: the directory.\n"); + } else if (err == ECANCELED) { + fprintf(stderr, + "InnoDB: Operation canceled (%d):%s\n", + err, strerror(err)); + + if(srv_use_atomic_writes) { + fprintf(stderr, + "InnoDB: Error trying to enable atomic writes on " + "non-supported destination!\n"); + } } else { if (strerror(err) != NULL) { fprintf(stderr, @@ -530,6 +646,8 @@ os_file_get_last_error_low( case ENOTDIR: case EISDIR: return(OS_FILE_PATH_ERROR); + case ECANCELED: + return(OS_FILE_OPERATION_NOT_SUPPORTED); case EAGAIN: if (srv_use_native_aio) { return(OS_FILE_AIO_RESOURCES_RESERVED); @@ -574,9 +692,11 @@ os_file_handle_error_cond_exit( const char* operation, /*!< in: operation */ ibool should_exit, /*!< in: call exit(3) if unknown error and this parameter is TRUE */ - ibool on_error_silent)/*!< in: if TRUE then don't print + ibool on_error_silent,/*!< in: if TRUE then don't print any message to the log iff it is an unknown non-fatal error */ + const char* file, /*!< in: file name */ + const ulint line) /*!< in: line */ { ulint err; @@ -606,6 +726,9 @@ os_file_handle_error_cond_exit( " InnoDB: Disk is full. Try to clean the disk" " to free space.\n"); + fprintf(stderr, + " InnoDB: at file %s and at line %ld\n", file, line); + os_has_said_disk_full = TRUE; fflush(stderr); @@ -652,6 +775,9 @@ os_file_handle_error_cond_exit( operation, err); } + fprintf(stderr, + " InnoDB: at file %s and at line %ld\n", file, line); + if (should_exit) { ut_print_timestamp(stderr); fprintf(stderr, " InnoDB: Cannot continue " @@ -675,10 +801,12 @@ ibool os_file_handle_error( /*=================*/ const char* name, /*!< in: name of a file or NULL */ - const char* operation) /*!< in: operation */ + const char* operation, /*!< in: operation */ + const char* file, /*!< in: file name */ + const ulint line) /*!< in: line */ { /* exit in case of unknown error */ - return(os_file_handle_error_cond_exit(name, operation, TRUE, FALSE)); + return(os_file_handle_error_cond_exit(name, operation, TRUE, FALSE, file, line)); } /****************************************************************//** @@ -690,12 +818,14 @@ os_file_handle_error_no_exit( /*=========================*/ const char* name, /*!< in: name of a file or NULL */ const char* operation, /*!< in: operation */ - ibool on_error_silent)/*!< in: if TRUE then don't print + ibool on_error_silent,/*!< in: if TRUE then don't print any message to the log. */ + const char* file, /*!< in: file name */ + const ulint line) /*!< in: line */ { /* don't exit in case of unknown error */ return(os_file_handle_error_cond_exit( - name, operation, FALSE, on_error_silent)); + name, operation, FALSE, on_error_silent, file, line)); } #undef USE_FILE_LOCK @@ -835,7 +965,7 @@ os_file_opendir( if (dir == INVALID_HANDLE_VALUE) { if (error_is_fatal) { - os_file_handle_error(dirname, "opendir"); + os_file_handle_error(dirname, "opendir", __FILE__, __LINE__); } return(NULL); @@ -846,7 +976,7 @@ os_file_opendir( dir = opendir(dirname); if (dir == NULL && error_is_fatal) { - os_file_handle_error(dirname, "opendir"); + os_file_handle_error(dirname, "opendir", __FILE__, __LINE__); } return(dir); @@ -868,7 +998,7 @@ os_file_closedir( ret = FindClose(dir); if (!ret) { - os_file_handle_error_no_exit(NULL, "closedir", FALSE); + os_file_handle_error_no_exit(NULL, "closedir", FALSE, __FILE__, __LINE__); return(-1); } @@ -880,7 +1010,7 @@ os_file_closedir( ret = closedir(dir); if (ret) { - os_file_handle_error_no_exit(NULL, "closedir", FALSE); + os_file_handle_error_no_exit(NULL, "closedir", FALSE, __FILE__, __LINE__); } return(ret); @@ -952,7 +1082,7 @@ next_file: return(1); } else { - os_file_handle_error_no_exit(NULL, "readdir_next_file", FALSE); + os_file_handle_error_no_exit(NULL, "readdir_next_file", FALSE, __FILE__, __LINE__); return(-1); } #else @@ -1038,7 +1168,7 @@ next_file: goto next_file; } - os_file_handle_error_no_exit(full_path, "stat", FALSE); + os_file_handle_error_no_exit(full_path, "stat", FALSE, __FILE__, __LINE__); ut_free(full_path); @@ -1089,7 +1219,7 @@ os_file_create_directory( && !fail_if_exists))) { os_file_handle_error_no_exit( - pathname, "CreateDirectory", FALSE); + pathname, "CreateDirectory", FALSE, __FILE__, __LINE__); return(FALSE); } @@ -1102,7 +1232,7 @@ os_file_create_directory( if (!(rcode == 0 || (errno == EEXIST && !fail_if_exists))) { /* failure */ - os_file_handle_error_no_exit(pathname, "mkdir", FALSE); + os_file_handle_error_no_exit(pathname, "mkdir", FALSE, __FILE__, __LINE__); return(FALSE); } @@ -1126,7 +1256,8 @@ os_file_create_simple_func( ulint create_mode,/*!< in: create mode */ ulint access_type,/*!< in: OS_FILE_READ_ONLY or OS_FILE_READ_WRITE */ - ibool* success)/*!< out: TRUE if succeed, FALSE if error */ + ibool* success,/*!< out: TRUE if succeed, FALSE if error */ + ibool atomic_writes) /*!slots = static_cast( ut_malloc(n * sizeof(*array->slots))); - memset(array->slots, 0x0, sizeof(n * sizeof(*array->slots))); + memset(array->slots, 0x0, n * sizeof(*array->slots)); + #ifdef __WIN__ array->handles = static_cast(ut_malloc(n * sizeof(HANDLE))); #endif /* __WIN__ */ @@ -3803,8 +3996,8 @@ os_aio_array_free( /*==============*/ os_aio_array_t*& array) /*!< in, own: array to free */ { -#ifdef WIN_ASYNC_IO ulint i; +#ifdef WIN_ASYNC_IO for (i = 0; i < array->n_slots; i++) { os_aio_slot_t* slot = os_aio_array_get_nth_slot(array, i); @@ -3826,6 +4019,14 @@ os_aio_array_free( } #endif /* LINUX_NATIVE_AIO */ + for (i = 0; i < array->n_slots; i++) { + os_aio_slot_t* slot = os_aio_array_get_nth_slot(array, i); + if (slot->page_compression_page) { + ut_free(slot->page_compression_page); + slot->page_compression_page = NULL; + } + } + ut_free(array->slots); ut_free(array); @@ -4159,7 +4360,12 @@ os_aio_array_reserve_slot( void* buf, /*!< in: buffer where to read or from which to write */ os_offset_t offset, /*!< in: file offset */ - ulint len) /*!< in: length of the block to read or write */ + ulint len, /*!< in: length of the block to read or write */ + ulint write_size) /*!< in: Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ { os_aio_slot_t* slot = NULL; #ifdef WIN_ASYNC_IO @@ -4249,6 +4455,54 @@ found: slot->buf = static_cast(buf); slot->offset = offset; slot->io_already_done = FALSE; + slot->page_compress_success = FALSE; + slot->write_size = write_size; + + /* If the space is page compressed and this is write operation + and if either only index pages compression is disabled or + page is index page and only index pages compression is enabled then + we compress the page */ + if (message1 && + type == OS_FILE_WRITE && + fil_space_is_page_compressed(fil_node_get_space_id(slot->message1)) && + (srv_page_compress_index_pages == false || + (srv_page_compress_index_pages == true && fil_page_is_index_page(slot->buf)))) { + ulint real_len = len; + byte* tmp = NULL; + + /* Release the array mutex while compressing */ + os_mutex_exit(array->mutex); + + // We allocate memory for page compressed buffer if and only + // if it is not yet allocated. + if (slot->page_buf == NULL) { + os_slot_alloc_page_buf(slot); + } + + ut_ad(slot->page_buf); + + /* Write buffer full of zeros, this is needed for trim, + can't really avoid this now. */ + memset(slot->page_buf, 0, len); + + tmp = fil_compress_page(fil_node_get_space_id(slot->message1), (byte *)buf, slot->page_buf, len, &real_len); + + /* If compression succeeded, set up the length and buffer */ + if (tmp != buf) { + len = real_len; + buf = slot->page_buf; + slot->len = real_len; + slot->page_compress_success = TRUE; + } else { + slot->page_compress_success = FALSE; + } + + /* Take array mutex back, not sure if this is really needed + below */ + os_mutex_enter(array->mutex); + + } + #ifdef WIN_ASYNC_IO control = &slot->control; @@ -4523,10 +4777,15 @@ os_aio_func( (can be used to identify a completed aio operation); ignored if mode is OS_AIO_SYNC */ - void* message2)/*!< in: message for the aio handler + void* message2,/*!< in: message for the aio handler (can be used to identify a completed aio operation); ignored if mode is OS_AIO_SYNC */ + ulint write_size)/*!< in/out: Actual write size initialized + after fist successfull trim + operation for this page and if + initialized we do not trim again if + actual page size does not decrease. */ { os_aio_array_t* array; os_aio_slot_t* slot; @@ -4624,7 +4883,8 @@ try_again: } slot = os_aio_array_reserve_slot(type, array, message1, message2, file, - name, buf, offset, n); + name, buf, offset, n, write_size); + if (type == OS_FILE_READ) { if (srv_use_native_aio) { os_n_file_reads++; @@ -4704,7 +4964,7 @@ err_exit: os_aio_array_free_slot(array, slot); if (os_file_handle_error( - name,type == OS_FILE_READ ? "aio read" : "aio write")) { + name,type == OS_FILE_READ ? "aio read" : "aio write", __FILE__, __LINE__)) { goto try_again; } @@ -4817,7 +5077,7 @@ os_aio_windows_handle( if (ret && len == slot->len) { ret_val = TRUE; - } else if (os_file_handle_error(slot->name, "Windows aio")) { + } else if (os_file_handle_error(slot->name, "Windows aio", __FILE__, __LINE__)) { retry = TRUE; } else { @@ -4847,9 +5107,17 @@ os_aio_windows_handle( switch (slot->type) { case OS_FILE_WRITE: - ret = WriteFile(slot->file, slot->buf, + if (slot->message1 && + fil_space_is_page_compressed(fil_node_get_space_id(slot->message1)) && + slot->page_buf) { + ret = WriteFile(slot->file, slot->page_buf, + (DWORD) slot->len, &len, + &(slot->control)); + } else { + ret = WriteFile(slot->file, slot->buf, (DWORD) slot->len, &len, &(slot->control)); + } break; case OS_FILE_READ: @@ -4881,6 +5149,29 @@ os_aio_windows_handle( ret_val = ret && len == slot->len; } + if (slot->message1 && + fil_space_is_page_compressed(fil_node_get_space_id(slot->message1))) { + // We allocate memory for page compressed buffer if and only + // if it is not yet allocated. + if (slot->page_buf == NULL) { + os_slot_alloc_page_buf(slot); + } + ut_ad(slot->page_buf); + + if (slot->type == OS_FILE_READ) { + if (fil_page_is_compressed(slot->buf)) { + fil_decompress_page(slot->page_buf, slot->buf, slot->len); + } + } else { + if (slot->page_compress_success && fil_page_is_compressed(slot->page_buf)) { + if (srv_use_trim && os_fallocate_failed == FALSE) { + // Deallocate unused blocks from file system + os_file_trim(slot->file, slot, slot->len); + } + } + } + } + os_aio_array_free_slot(array, slot); return(ret_val); @@ -4970,6 +5261,34 @@ retry: /* We have not overstepped to next segment. */ ut_a(slot->pos < end_pos); + /* If the table is page compressed and this is read, + we decompress before we annouce the read is + complete. For writes, we free the compressed page. */ + if (slot->message1 && + fil_space_is_page_compressed(fil_node_get_space_id(slot->message1))) { + // We allocate memory for page compressed buffer if and only + // if it is not yet allocated. + if (slot->page_buf == NULL) { + os_slot_alloc_page_buf(slot); + } + ut_ad(slot->page_buf); + + if (slot->type == OS_FILE_READ) { + if (fil_page_is_compressed(slot->buf)) { + fil_decompress_page(slot->page_buf, slot->buf, slot->len); + } + } else { + if (slot->page_compress_success && + fil_page_is_compressed(slot->page_buf)) { + ut_ad(slot->page_compression_page); + if (srv_use_trim && os_fallocate_failed == FALSE) { + // Deallocate unused blocks from file system + os_file_trim(slot->file, slot, slot->len); + } + } + } + } + /* Mark this request as completed. The error handling will be done in the calling function. */ os_mutex_enter(array->mutex); @@ -5113,6 +5432,13 @@ found: } else { errno = -slot->ret; + if (slot->ret == 0) { + fprintf(stderr, + "InnoDB: Number of bytes after aio %d requested %lu\n" + "InnoDB: from file %s\n", + slot->n_bytes, slot->len, slot->name); + } + /* os_file_handle_error does tell us if we should retry this IO. As it stands now, we don't do this retry when reaping requests from a different context than @@ -5120,7 +5446,7 @@ found: windows and linux native AIO. We should probably look into this to transparently re-submit the IO. */ - os_file_handle_error(slot->name, "Linux aio"); + os_file_handle_error(slot->name, "Linux aio", __FILE__, __LINE__); ret = FALSE; } @@ -5323,7 +5649,7 @@ consecutive_loop: if (slot->reserved && slot != aio_slot - && slot->offset == slot->offset + aio_slot->len + && slot->offset == aio_slot->offset + aio_slot->len && slot->type == aio_slot->type && slot->file == aio_slot->file) { @@ -5791,4 +6117,147 @@ os_aio_all_slots_free(void) } #endif /* UNIV_DEBUG */ +#ifdef _WIN32 +#include +#ifndef FSCTL_FILE_LEVEL_TRIM +#define FSCTL_FILE_LEVEL_TRIM CTL_CODE(FILE_DEVICE_FILE_SYSTEM, 130, METHOD_BUFFERED, FILE_WRITE_DATA) +typedef struct _FILE_LEVEL_TRIM_RANGE { + DWORDLONG Offset; + DWORDLONG Length; +} FILE_LEVEL_TRIM_RANGE, *PFILE_LEVEL_TRIM_RANGE; + +typedef struct _FILE_LEVEL_TRIM { + DWORD Key; + DWORD NumRanges; + FILE_LEVEL_TRIM_RANGE Ranges[1]; +} FILE_LEVEL_TRIM, *PFILE_LEVEL_TRIM; +#endif +#endif + +/**********************************************************************//** +Directly manipulate the allocated disk space by deallocating for the file referred to +by fd for the byte range starting at offset and continuing for len bytes. +Within the specified range, partial file system blocks are zeroed, and whole +file system blocks are removed from the file. After a successful call, +subsequent reads from this range will return zeroes. +@return true if success, false if error */ +UNIV_INTERN +ibool +os_file_trim( +/*=========*/ + os_file_t file, /*!< in: file to be trimmed */ + os_aio_slot_t* slot, /*!< in: slot structure */ + ulint len) /*!< in: length of area */ +{ + + size_t trim_len = UNIV_PAGE_SIZE - len; + os_offset_t off = slot->offset + len; + + // Nothing to do if trim length is zero or if actual write + // size is initialized and it is smaller than current write size. + // In first write if we trim we set write_size to actual bytes + // written and rest of the page is trimmed. In following writes + // there is no need to trim again if write_size only increases + // because rest of the page is already trimmed. If actual write + // size decreases we need to trim again. + if (trim_len == 0 || + (slot->write_size > 0 && len >= slot->write_size)) { + + if (slot->write_size > 0 && len >= slot->write_size) { + srv_stats.page_compressed_trim_op_saved.inc(); + } + + slot->write_size = len; + + return (TRUE); + } + +#ifdef __linux__ +#if defined(FALLOC_FL_PUNCH_HOLE) && defined (FALLOC_FL_KEEP_SIZE) + int ret = fallocate(file, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, trim_len); + + if (ret) { + /* After first failure do not try to trim again */ + os_fallocate_failed = TRUE; + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: [Warning] fallocate call failed with error code %d.\n" + " InnoDB: start: %lx len: %lu payload: %lu\n" + " InnoDB: Disabling fallocate for now.\n", ret, (slot->offset+len), trim_len, len); + + os_file_handle_error_no_exit(slot->name, + " fallocate(FALLOC_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE) ", + FALSE, __FILE__, __LINE__); + + slot->write_size = 0; + + return (FALSE); + } else { + slot->write_size = len; + } +#else + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: [Warning] fallocate not supported on this installation." + " InnoDB: Disabling fallocate for now."); + os_fallocate_failed = TRUE; + slot->write_size = 0; + +#endif /* HAVE_FALLOCATE ... */ + +#elif defined(_WIN32) + FILE_LEVEL_TRIM flt; + flt.Key = 0; + flt.NumRanges = 1; + flt.Ranges[0].Offset = off; + flt.Ranges[0].Length = trim_len; + + BOOL ret = DeviceIoControl(file,FSCTL_FILE_LEVEL_TRIM,&flt, sizeof(flt), NULL, NULL, NULL, NULL); + + if (!ret) { + /* After first failure do not try to trim again */ + os_fallocate_failed = TRUE; + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: [Warning] fallocate call failed with error.\n" + " InnoDB: start: %lx len: %du payload: %lu\n" + " InnoDB: Disabling fallocate for now.\n", (slot->offset+len), trim_len, len); + + os_file_handle_error_no_exit(slot->name, + " DeviceIOControl(FSCTL_FILE_LEVEL_TRIM) ", + FALSE, __FILE__, __LINE__); + + slot->write_size = 0; + return (FALSE); + } else { + slot->write_size = len; + } +#endif + +#define SECT_SIZE 512 + srv_stats.page_compression_trim_sect512.add((trim_len / SECT_SIZE)); + srv_stats.page_compression_trim_sect4096.add((trim_len / (SECT_SIZE*8))); + srv_stats.page_compressed_trim_op.inc(); + + return (TRUE); + +} #endif /* !UNIV_HOTBACKUP */ + +/**********************************************************************//** +Allocate memory for temporal buffer used for page compression. This +buffer is freed later. */ +UNIV_INTERN +void +os_slot_alloc_page_buf( +/*===================*/ + os_aio_slot_t* slot) /*!< in: slot structure */ +{ + byte* cbuf2; + byte* cbuf; + + cbuf2 = static_cast(ut_malloc(UNIV_PAGE_SIZE*2)); + cbuf = static_cast(ut_align(cbuf2, UNIV_PAGE_SIZE)); + slot->page_compression_page = static_cast(cbuf2); + slot->page_buf = static_cast(cbuf); +} diff --git a/storage/innobase/srv/srv0mon.cc b/storage/innobase/srv/srv0mon.cc index 3b3da2f070f..44a60961110 100644 --- a/storage/innobase/srv/srv0mon.cc +++ b/storage/innobase/srv/srv0mon.cc @@ -290,6 +290,12 @@ static monitor_info_t innodb_counter_info[] = MONITOR_EXISTING | MONITOR_DEFAULT_ON), MONITOR_DEFAULT_START, MONITOR_OVLD_PAGES_WRITTEN}, + {"buffer_index_pages_written", "buffer", + "Number of index pages written (innodb_index_pages_written)", + static_cast( + MONITOR_EXISTING | MONITOR_DEFAULT_ON), + MONITOR_DEFAULT_START, MONITOR_OVLD_INDEX_PAGES_WRITTEN}, + {"buffer_pages_read", "buffer", "Number of pages read (innodb_pages_read)", static_cast( @@ -875,6 +881,41 @@ static monitor_info_t innodb_counter_info[] = MONITOR_NONE, MONITOR_DEFAULT_START, MONITOR_PAD_DECREMENTS}, + {"compress_saved", "compression", + "Number of bytes saved by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESS_SAVED}, + + {"compress_trim_sect512", "compression", + "Number of sect-512 TRIMed by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT512}, + + {"compress_trim_sect4096", "compression", + "Number of sect-4K TRIMed by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT4096}, + + {"compress_pages_page_compressed", "compression", + "Number of pages compressed by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGES_PAGE_COMPRESSED}, + + {"compress_page_compressed_trim_op", "compression", + "Number of TRIM operation performed by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP}, + + {"compress_page_compressed_trim_op_saved", "compression", + "Number of TRIM operation saved by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP_SAVED}, + + {"compress_pages_page_decompressed", "compression", + "Number of pages decompressed by page compression", + MONITOR_NONE, + MONITOR_DEFAULT_START, MONITOR_OVLD_PAGES_PAGE_DECOMPRESSED}, + /* ========== Counters for Index ========== */ {"module_index", "index", "Index Manager", MONITOR_MODULE, @@ -1528,6 +1569,11 @@ srv_mon_process_existing_counter( value = stat.n_pages_written; break; + /* innodb_index_pages_written, the number of page written */ + case MONITOR_OVLD_INDEX_PAGES_WRITTEN: + value = srv_stats.index_pages_written; + break; + /* innodb_pages_read */ case MONITOR_OVLD_PAGES_READ: buf_get_total_stat(&stat); @@ -1769,6 +1815,28 @@ srv_mon_process_existing_counter( value = btr_cur_n_non_sea; break; + case MONITOR_OVLD_PAGE_COMPRESS_SAVED: + value = srv_stats.page_compression_saved; + break; + case MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT512: + value = srv_stats.page_compression_trim_sect512; + break; + case MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT4096: + value = srv_stats.page_compression_trim_sect4096; + break; + case MONITOR_OVLD_PAGES_PAGE_COMPRESSED: + value = srv_stats.pages_page_compressed; + break; + case MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP: + value = srv_stats.page_compressed_trim_op; + break; + case MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP_SAVED: + value = srv_stats.page_compressed_trim_op_saved; + break; + case MONITOR_OVLD_PAGES_PAGE_DECOMPRESSED: + value = srv_stats.pages_page_decompressed; + break; + default: ut_error; } diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 4c5753ac40e..90864cee9ef 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -145,6 +145,24 @@ use simulated aio we build below with threads. Currently we support native aio on windows and linux */ UNIV_INTERN my_bool srv_use_native_aio = TRUE; +/* If this flag is TRUE, then we will use page compression +to the pages */ +UNIV_INTERN my_bool srv_compress_pages = FALSE; +/* If this flag is TRUE, then we will use page compression +only for index pages */ +UNIV_INTERN my_bool srv_page_compress_index_pages = FALSE; +UNIV_INTERN long srv_trim_pct = 100; +/* Default compression level if page compression is used and no compression +level is set for the table*/ +UNIV_INTERN long srv_compress_zlib_level = 6; +/* If this flag is TRUE, then we will use fallocate(PUCH_HOLE) +to the pages */ +UNIV_INTERN my_bool srv_use_trim = TRUE; +/* If this flag is TRUE, then we will use posix fallocate for file extentsion */ +UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE; +/* If this flag is TRUE, then we disable doublewrite buffer */ +UNIV_INTERN my_bool srv_use_atomic_writes = FALSE; + #ifdef __WIN__ /* Windows native condition variables. We use runtime loading / function pointers, because they are not available on Windows Server 2003 and @@ -347,11 +365,6 @@ batch flushing i.e.: LRU flushing and flush_list flushing. The rest of the pages are used for single page flushing. */ UNIV_INTERN ulong srv_doublewrite_batch_size = 120; -UNIV_INTERN ibool srv_use_atomic_writes = FALSE; -#ifdef HAVE_POSIX_FALLOCATE -UNIV_INTERN ibool srv_use_posix_fallocate = TRUE; -#endif - UNIV_INTERN ulong srv_replication_delay = 0; /*-------------------------------------------*/ @@ -375,6 +388,16 @@ static ulint srv_n_rows_read_old = 0; UNIV_INTERN ulint srv_truncated_status_writes = 0; UNIV_INTERN ulint srv_available_undo_logs = 0; +UNIV_INTERN ib_uint64_t srv_page_compression_saved = 0; +UNIV_INTERN ib_uint64_t srv_page_compression_trim_sect512 = 0; +UNIV_INTERN ib_uint64_t srv_page_compression_trim_sect4096 = 0; +UNIV_INTERN ib_uint64_t srv_index_pages_written = 0; +UNIV_INTERN ib_uint64_t srv_pages_page_compressed = 0; +UNIV_INTERN ib_uint64_t srv_page_compressed_trim_op = 0; +UNIV_INTERN ib_uint64_t srv_page_compressed_trim_op_saved = 0; +UNIV_INTERN ib_uint64_t srv_index_page_decompressed = 0; + + /* Set the following to 0 if you want InnoDB to write messages on stderr on startup/shutdown. */ UNIV_INTERN ibool srv_print_verbose_log = TRUE; @@ -1457,6 +1480,14 @@ srv_export_innodb_status(void) srv_truncated_status_writes; export_vars.innodb_available_undo_logs = srv_available_undo_logs; + export_vars.innodb_page_compression_saved = srv_stats.page_compression_saved; + export_vars.innodb_page_compression_trim_sect512 = srv_stats.page_compression_trim_sect512; + export_vars.innodb_page_compression_trim_sect4096 = srv_stats.page_compression_trim_sect4096; + export_vars.innodb_index_pages_written = srv_stats.index_pages_written; + export_vars.innodb_pages_page_compressed = srv_stats.pages_page_compressed; + export_vars.innodb_page_compressed_trim_op = srv_stats.page_compressed_trim_op; + export_vars.innodb_page_compressed_trim_op_saved = srv_stats.page_compressed_trim_op_saved; + export_vars.innodb_pages_page_decompressed = srv_stats.pages_page_decompressed; #ifdef UNIV_DEBUG if (purge_sys->done.trx_no == 0 diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index efe9f094c0d..0517f4b1468 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -3,6 +3,7 @@ Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved. Copyright (c) 2008, Google Inc. Copyright (c) 2009, Percona Inc. +Copyright (c) 2013, SkySQL Ab. All Rights Reserved. Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described @@ -126,7 +127,10 @@ static os_file_t files[1000]; /** io_handler_thread parameters for thread identification */ static ulint n[SRV_MAX_N_IO_THREADS + 6]; /** io_handler_thread identifiers, 32 is the maximum number of purge threads */ -static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32]; +/** pgcomp_thread are 16 total */ +#define START_PGCOMP_CNT (SRV_MAX_N_IO_THREADS + 6 + 32) +#define PGCOMP_MAX_WORKER 16 +static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + PGCOMP_MAX_WORKER]; /** We use this mutex to test the return value of pthread_mutex_trylock on successful locking. HP-UX does NOT return 0, though Linux et al do. */ @@ -522,7 +526,7 @@ create_log_file( *file = os_file_create( innodb_file_log_key, name, - OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret); + OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret, FALSE); ib_logf(IB_LOG_LEVEL_INFO, "Setting log file %s size to %lu MB", @@ -715,7 +719,7 @@ open_log_file( *file = os_file_create(innodb_file_log_key, name, OS_FILE_OPEN, OS_FILE_AIO, - OS_LOG_FILE, &ret); + OS_LOG_FILE, &ret, FALSE); if (!ret) { ib_logf(IB_LOG_LEVEL_ERROR, "Unable to open '%s'", name); return(DB_ERROR); @@ -806,7 +810,7 @@ open_or_create_data_files( files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_CREATE, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); if (srv_read_only_mode) { @@ -849,7 +853,7 @@ open_or_create_data_files( files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN_RAW, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); if (!ret) { ib_logf(IB_LOG_LEVEL_ERROR, @@ -881,17 +885,17 @@ open_or_create_data_files( files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN_RAW, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); } else if (i == 0) { files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN_RETRY, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); } else { files[i] = os_file_create( innodb_file_data_key, name, OS_FILE_OPEN, OS_FILE_NORMAL, - OS_DATA_FILE, &ret); + OS_DATA_FILE, &ret, FALSE); } if (!ret) { @@ -1078,7 +1082,7 @@ srv_undo_tablespace_create( innodb_file_data_key, name, srv_read_only_mode ? OS_FILE_OPEN : OS_FILE_CREATE, - OS_FILE_NORMAL, OS_DATA_FILE, &ret); + OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE); if (srv_read_only_mode && ret) { ib_logf(IB_LOG_LEVEL_INFO, @@ -1159,7 +1163,8 @@ srv_undo_tablespace_open( | OS_FILE_ON_ERROR_SILENT, OS_FILE_NORMAL, OS_DATA_FILE, - &ret); + &ret, + FALSE); /* If the file open was successful then load the tablespace. */ @@ -1430,6 +1435,691 @@ srv_start_wait_for_purge_to_start() } } +/* JAN: TODO: */ +/**********************************************************************************/ +extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time); +extern ibool buf_flush_start(buf_pool_t* buf_pool, enum buf_flush flush_type); +extern void buf_flush_end(buf_pool_t* buf_pool, enum buf_flush flush_type); +extern void buf_flush_common(enum buf_flush flush_type, ulint page_count); +extern ulint buf_flush_batch(buf_pool_t* buf_pool, enum buf_flush flush_type, ulint min_n, lsn_t lsn_limit); + +typedef enum wrk_status { + WRK_ITEM_SET=0, + WRK_ITEM_START=1, + WRK_ITEM_DONE=2, + WRK_ITEM_SUCCESS=2, + WRK_ITEM_FAILED=3, + WRK_ITEM_STATUS_UNDEFINED +} wrk_status_t; + +typedef enum wthr_status { + WTHR_NOT_INIT=0, + WTHR_INITIALIZED=1, + WTHR_SIG_WAITING=2, + WTHR_RUNNING=3, + WTHR_NO_WORK=4, + WTHR_KILL_IT=5, + WTHR_STATUS_UNDEFINED +} wthr_status_t; + +typedef struct wrk_itm +{ + /****************************/ + /* Need to group into struct*/ + buf_pool_t* buf_pool; //buffer-pool instance + int flush_type; //flush-type for buffer-pool flush operation + int min; //minimum number of pages requested to be flushed + unsigned long long lsn_limit; //lsn limit for the buffer-pool flush operation + /****************************/ + + unsigned long result; //flush pages count + unsigned long t_usec; //time-taken in usec + long id_usr; //thread-id currently working + wrk_status_t wi_status; //flag + struct wrk_itm *next; +} wrk_t; + +typedef enum op_q_status { + Q_NOT_INIT=0, + Q_EMPTY=1, + Q_INITIALIZED=2, + Q_PROCESS=3, + Q_DONE=4, + Q_ERROR=5, + Q_STATUS_UNDEFINED +} q_status_t; + +typedef struct op_queue +{ + pthread_mutex_t mtx; + pthread_cond_t cv; + q_status_t flag; + wrk_t *head; + wrk_t *tail; +} opq_t; + +opq_t wq, cq; + +typedef struct thread_sync +{ + int wthread_id; + pthread_t wthread; + opq_t *wq; + opq_t *cq; + wthr_status_t wt_status; + unsigned long stat_universal_num_processed; + unsigned long stat_cycle_num_processed; +} thread_sync_t; + +/* Global XXX:DD needs to be cleaned */ +int exit_flag; +ulint check_wrk_done_count; +static ulint done_cnt_flag; +static int pgc_n_threads = 8; + +thread_sync_t pc_sync[PGCOMP_MAX_WORKER]; +static wrk_t work_items[PGCOMP_MAX_WORKER]; +static int pgcomp_wrk_initialized = -1; + +int set_check_done_flag_count(int cnt) +{ + return(check_wrk_done_count = cnt); +} + +int set_pgcomp_wrk_init_done(void) +{ + pgcomp_wrk_initialized = 1; + return 0; +} + +int is_pgcomp_wrk_init_done(void) +{ + return(pgcomp_wrk_initialized == 1); +} + +ulint set_done_cnt_flag(ulint val) +{ + /* + * Assumption: The thread calling into set_done_cnt_flag + * needs to have "cq.mtx" acquired, else not safe. + */ + done_cnt_flag = val; + return done_cnt_flag; +} + + +ulint cv_done_inc_flag_sig(thread_sync_t * ppc) +{ + pthread_mutex_lock(&ppc->cq->mtx); + ppc->stat_universal_num_processed++; + ppc->stat_cycle_num_processed++; + done_cnt_flag++; + if(!(done_cnt_flag <= check_wrk_done_count)) { + fprintf(stderr, "ERROR: done_cnt:%lu check_wrk_done_count:%lu\n", + done_cnt_flag, check_wrk_done_count); + } + assert(done_cnt_flag <= check_wrk_done_count); + pthread_mutex_unlock(&ppc->cq->mtx); + if(done_cnt_flag == check_wrk_done_count) { + ppc->wq->flag = Q_DONE; + pthread_mutex_lock(&ppc->cq->mtx); + ppc->cq->flag = Q_DONE; + pthread_cond_signal(&ppc->cq->cv); + pthread_mutex_unlock(&ppc->cq->mtx); + } + return(done_cnt_flag); +} + +int q_remove_wrk(opq_t *q, wrk_t **wi) +{ + int ret = 0; + + if(!wi || !q) { + return -1; + } + + pthread_mutex_lock(&q->mtx); + assert(!((q->tail == NULL) && (q->head != NULL))); + assert(!((q->tail != NULL) && (q->head == NULL))); + + /* get the first in the list*/ + *wi = q->head; + if(q->head) { + ret = 0; + q->head = q->head->next; + (*wi)->next = NULL; + if(!q->head) { + q->tail = NULL; + } + } else { + q->tail = NULL; + ret = 1; /* indicating remove from queue failed */ + } + pthread_mutex_unlock(&q->mtx); + return (ret); +} + +int is_busy_wrk_itm(wrk_t *wi) +{ + if(!wi) { + return -1; + } + return(!(wi->id_usr == -1)); +} + +int setup_wrk_itm(int items) +{ + int i; + for(i=0; imtx, NULL); + pthread_cond_init(&q->cv, NULL); + q->flag = Q_INITIALIZED; + q->head = q->tail = NULL; + + return 0; +} + +#if 0 +int drain_cq(opq_t *cq, int items) +{ + int i=0; + + if(!cq) { + return -1; + } + pthread_mutex_lock(&cq->mtx); + for(i=0; ihead = cq->tail = NULL; + pthread_mutex_unlock(&cq->mtx); + return 0; +} +#endif + +int q_insert_wrk_list(opq_t *q, wrk_t *w_list) +{ + if((!q) || (!w_list)) { + fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list); + return -1; + } + + pthread_mutex_lock(&q->mtx); + + assert(!((q->tail == NULL) && (q->head != NULL))); + assert(!((q->tail != NULL) && (q->head == NULL))); + + /* list is empty */ + if(!q->tail) { + q->head = q->tail = w_list; + } else { + /* added the first of the node to list */ + assert(q->head != NULL); + q->tail->next = w_list; + } + + /* move tail to the last node */ + while(q->tail->next) { + q->tail = q->tail->next; + } + pthread_mutex_unlock(&q->mtx); + + return 0; +} + +int flush_pool_instance(wrk_t *wi) +{ + struct timeval p_start_time, p_end_time, d_time; + + if(!wi) { + fprintf(stderr, "work item invalid wi:%p\n", wi); + return -1; + } + + wi->t_usec = 0; + if (!buf_flush_start(wi->buf_pool, (buf_flush)wi->flush_type)) { + /* We have two choices here. If lsn_limit was + specified then skipping an instance of buffer + pool means we cannot guarantee that all pages + up to lsn_limit has been flushed. We can + return right now with failure or we can try + to flush remaining buffer pools up to the + lsn_limit. We attempt to flush other buffer + pools based on the assumption that it will + help in the retry which will follow the + failure. */ + fprintf(stderr, "flush_start Failed, flush_type:%d\n", + (buf_flush)wi->flush_type); + return -1; + } + +#ifdef UNIV_DEBUG + /* Record time taken for the OP in usec */ + gettimeofday(&p_start_time, 0x0); +#endif + + if((buf_flush)wi->flush_type == BUF_FLUSH_LRU) { + /* srv_LRU_scan_depth can be arbitrarily large value. + * We cap it with current LRU size. + */ + buf_pool_mutex_enter(wi->buf_pool); + wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU); + buf_pool_mutex_exit(wi->buf_pool); + wi->min = ut_min(srv_LRU_scan_depth,wi->min); + } + + wi->result = buf_flush_batch(wi->buf_pool, + (buf_flush)wi->flush_type, + wi->min, wi->lsn_limit); + + buf_flush_end(wi->buf_pool, (buf_flush)wi->flush_type); + buf_flush_common((buf_flush)wi->flush_type, wi->result); + +#ifdef UNIV_DEBUG + gettimeofday(&p_end_time, 0x0); + timediff(&p_end_time, &p_start_time, &d_time); + + wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000)); +#endif + + return 0; +} + +int service_page_comp_io(thread_sync_t * ppc) +{ + wrk_t *wi = NULL; + int ret=0; + + pthread_mutex_lock(&ppc->wq->mtx); + do{ + ppc->wt_status = WTHR_SIG_WAITING; + ret = pthread_cond_wait(&ppc->wq->cv, &ppc->wq->mtx); + ppc->wt_status = WTHR_RUNNING; + if(ret == ETIMEDOUT) { + fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%lu] ret:%d\n", + done_cnt_flag, ret); + } else if(ret == EINVAL || ret == EPERM) { + fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%lu] ret:%d\n", + done_cnt_flag, ret); + } + if(ppc->wq->flag == Q_PROCESS) { + break; + } else { + pthread_mutex_unlock(&ppc->wq->mtx); + return -1; + } + } while (ppc->wq->flag == Q_PROCESS && ret == 0); + + pthread_mutex_unlock(&ppc->wq->mtx); + + while (ppc->cq->flag == Q_PROCESS) { + wi = NULL; + /* Get the work item */ + if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) { + ppc->wt_status = WTHR_NO_WORK; + return -1; + } + + assert(ret==0); + assert(wi != NULL); + assert(0 == is_busy_wrk_itm(wi)); + assert(wi->id_usr == -1); + + wi->id_usr = ppc->wthread; + wi->wi_status = WRK_ITEM_START; + + /* Process work item */ + if(0 != (ret = flush_pool_instance(wi))) { + fprintf(stderr, "FLUSH op failed ret:%d\n", ret); + wi->wi_status = WRK_ITEM_FAILED; + } + + ret = q_insert_wrk_list(ppc->cq, wi); + + assert(0==ret); + assert(check_wrk_done_count >= done_cnt_flag); + wi->wi_status = WRK_ITEM_SUCCESS; + if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) { + break; + } + } + return(0); +} + +/******************************************************************//** +@return a dummy parameter*/ +extern "C" UNIV_INTERN +os_thread_ret_t +DECLARE_THREAD(page_comp_io_thread)( +/*==========================================*/ + void * arg) +{ + thread_sync_t *ppc_io = ((thread_sync_t *)arg); + + while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) { + service_page_comp_io(ppc_io); + ppc_io->stat_cycle_num_processed = 0; + } + os_thread_exit(NULL); + OS_THREAD_DUMMY_RETURN; +} + +int print_queue_wrk_itm(opq_t *q) +{ +#if UNIV_DEBUG + wrk_t *wi = NULL; + + if(!q) { + fprintf(stderr, "queue NULL\n"); + return -1; + } + + if(!q->head || !q->tail) { + assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL)))); + fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail); + return 0; + } + + pthread_mutex_lock(&q->mtx); + for(wi = q->head; (wi != NULL) ; wi = wi->next) { + //fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n", + // wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next); + fprintf(stderr, "- [%p] [%s] >%p\n", + wi, (wi->id_usr == -1)?"free":"Busy", wi->next); + } + pthread_mutex_unlock(&q->mtx); +#endif + return(0); +} + +int print_wrk_list(wrk_t *wi_list) +{ + wrk_t *wi = wi_list; + int i=0; + + if(!wi_list) { + fprintf(stderr, "list NULL\n"); + } + + while(wi) { + fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n", + wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next); + wi = wi->next; + i++; + } + fprintf(stderr, "list len: %d\n", i); + return 0; +} + +int pgcomp_handler(wrk_t *w_list) +{ + int ret=0; + opq_t *wrk_q=NULL, *comp_q=NULL; + + wrk_q=&wq; + comp_q=&cq; + + pthread_mutex_lock(&wrk_q->mtx); + /* setup work queue here.. */ + wrk_q->flag = Q_EMPTY; + pthread_mutex_unlock(&wrk_q->mtx); + + ret = q_insert_wrk_list(wrk_q, w_list); + if(ret != 0) { + fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n", + __FUNCTION__, &wq, w_list); + return -1; + } + +retry_submit: + pthread_mutex_lock(&wrk_q->mtx); + /* setup work queue here.. */ + wrk_q->flag = Q_INITIALIZED; + pthread_mutex_unlock(&wrk_q->mtx); + + + pthread_mutex_lock(&comp_q->mtx); + if(0 != set_done_cnt_flag(0)) { + fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&comp_q->mtx); + return -1; + } + comp_q->flag = Q_PROCESS; + pthread_mutex_unlock(&comp_q->mtx); + + /* if threads are waiting request them to start */ + pthread_mutex_lock(&wrk_q->mtx); + wrk_q->flag = Q_PROCESS; + pthread_cond_broadcast(&wrk_q->cv); + pthread_mutex_unlock(&wrk_q->mtx); + + /* Wait on all worker-threads to complete */ + pthread_mutex_lock(&comp_q->mtx); + if (comp_q->flag != Q_DONE) { + do { + pthread_cond_wait(&comp_q->cv, &comp_q->mtx); + if(comp_q->flag != Q_DONE) { + fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + if (done_cnt_flag != srv_buf_pool_instances) { + fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + } + continue; + } else if (done_cnt_flag != srv_buf_pool_instances) { + fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + comp_q->flag = Q_INITIALIZED; + pthread_mutex_unlock(&comp_q->mtx); + goto retry_submit; + + assert(!done_cnt_flag); + continue; + } + assert(done_cnt_flag == srv_buf_pool_instances); + + if ((comp_q->flag == Q_DONE) && + (done_cnt_flag == srv_buf_pool_instances)) { + break; + } + } while((comp_q->flag == Q_INITIALIZED) && + (done_cnt_flag != srv_buf_pool_instances)); + } else { + fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%lu\n", + comp_q->flag, done_cnt_flag); + if (!done_cnt_flag) { + fprintf(stderr, "============\n"); + print_wrk_list(w_list); + fprintf(stderr, "============\n"); + comp_q->flag = Q_INITIALIZED; + pthread_mutex_unlock(&comp_q->mtx); + goto retry_submit; + assert(!done_cnt_flag); + } + assert(done_cnt_flag == srv_buf_pool_instances); + } + + pthread_mutex_unlock(&comp_q->mtx); + pthread_mutex_lock(&wrk_q->mtx); + wrk_q->flag = Q_DONE; + pthread_mutex_unlock(&wrk_q->mtx); + + return 0; +} + +/******************************************************************//** +@return a dummy parameter*/ +int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq) +{ + int i=0; + + if(is_pgcomp_wrk_init_done()) { + fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n"); + return -1; + } + + if(!wq || !cq) { + fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq); + return -1; + } + + /* work-item setup */ + setup_wrk_itm(wrk_cnt); + + /* wq & cq setup */ + init_queue(wq); + init_queue(cq); + + /* Mark each of the thread sync entires */ + for(i=0; i < PGCOMP_MAX_WORKER; i++) { + pc_sync[i].wthread_id = i; + } + + /* Create threads for page-compression-flush */ + for(i=0; i < num_threads; i++) { + pc_sync[i].wthread_id = i; + pc_sync[i].wq = wq; + pc_sync[i].cq = cq; + os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)), + thread_ids + START_PGCOMP_CNT + i); + //pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i]; + pc_sync[i].wthread = (START_PGCOMP_CNT + i); + pc_sync[i].wt_status = WTHR_INITIALIZED; + } + + set_check_done_flag_count(wrk_cnt); + set_pgcomp_wrk_init_done(); + + return 0; +} + + +int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads) +{ + long stat_tot=0; + unsigned int i=0; + for(i=0; i< num_threads;i++) { + stat_tot+=wthr[i].stat_universal_num_processed; + fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id, + wthr[i].stat_universal_num_processed); + } + fprintf(stderr, "Stat-Total:%lu\n", stat_tot); + return (0); +} + +int reset_wrk_itm(int items) +{ + int i; + + pthread_mutex_lock(&wq.mtx); + wq.head = wq.tail = NULL; + pthread_mutex_unlock(&wq.mtx); + + pthread_mutex_lock(&cq.mtx); + for(i=0;i