summaryrefslogtreecommitdiff
path: root/storage/innobase
diff options
context:
space:
mode:
authorJan Lindström <jplindst@mariadb.org>2013-12-19 14:36:38 +0200
committerJan Lindström <jplindst@mariadb.org>2013-12-19 14:36:38 +0200
commit5e55d1ced52c52fb2f0508e1346059901a85960f (patch)
tree517032a404d9b0ebde9b9174459fbce301dcec95 /storage/innobase
parent1f4f425a2007c51eeee35f911a787fc7d82d977c (diff)
downloadmariadb-git-5e55d1ced52c52fb2f0508e1346059901a85960f.tar.gz
Changes for Fusion-io multi-threaded flush, page compressed tables and
tables using atomic write/table. This is work in progress and some parts are at most POC quality.
Diffstat (limited to 'storage/innobase')
-rw-r--r--storage/innobase/CMakeLists.txt3
-rw-r--r--storage/innobase/buf/buf0buf.cc2
-rw-r--r--storage/innobase/buf/buf0dblwr.cc31
-rw-r--r--storage/innobase/buf/buf0flu.cc324
-rw-r--r--storage/innobase/buf/buf0mtflu.cc1103
-rw-r--r--storage/innobase/buf/buf0rea.cc5
-rw-r--r--storage/innobase/dict/dict0dict.cc7
-rw-r--r--storage/innobase/fil/fil0fil.cc175
-rw-r--r--storage/innobase/fil/fil0pagecompress.cc369
-rw-r--r--storage/innobase/handler/ha_innodb.cc242
-rw-r--r--storage/innobase/handler/ha_innodb.h15
-rw-r--r--storage/innobase/handler/handler0alter.cc28
-rw-r--r--storage/innobase/include/buf0buf.h6
-rw-r--r--storage/innobase/include/dict0dict.h14
-rw-r--r--storage/innobase/include/dict0dict.ic151
-rw-r--r--storage/innobase/include/dict0mem.h56
-rw-r--r--storage/innobase/include/dict0pagecompress.h94
-rw-r--r--storage/innobase/include/dict0pagecompress.ic191
-rw-r--r--storage/innobase/include/fil0fil.h43
-rw-r--r--storage/innobase/include/fil0pagecompress.h117
-rw-r--r--storage/innobase/include/fsp0fsp.h66
-rw-r--r--storage/innobase/include/fsp0fsp.ic17
-rw-r--r--storage/innobase/include/fsp0pagecompress.h64
-rw-r--r--storage/innobase/include/fsp0pagecompress.ic61
-rw-r--r--storage/innobase/include/fsp0types.h1
-rw-r--r--storage/innobase/include/os0file.h57
-rw-r--r--storage/innobase/include/os0file.ic13
-rw-r--r--storage/innobase/include/srv0mon.h10
-rw-r--r--storage/innobase/include/srv0srv.h64
-rw-r--r--storage/innobase/log/log0log.cc17
-rw-r--r--storage/innobase/log/log0recv.cc19
-rw-r--r--storage/innobase/os/os0file.cc561
-rw-r--r--storage/innobase/srv/srv0mon.cc68
-rw-r--r--storage/innobase/srv/srv0srv.cc41
-rw-r--r--storage/innobase/srv/srv0start.cc720
35 files changed, 4559 insertions, 196 deletions
diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt
index ee8758a08d2..e41d2406bd2 100644
--- a/storage/innobase/CMakeLists.txt
+++ b/storage/innobase/CMakeLists.txt
@@ -278,6 +278,8 @@ SET(INNOBASE_SOURCES
buf/buf0flu.cc
buf/buf0lru.cc
buf/buf0rea.cc
+# TODO: JAN uncomment
+# buf/buf0mtflu.cc
data/data0data.cc
data/data0type.cc
dict/dict0boot.cc
@@ -291,6 +293,7 @@ SET(INNOBASE_SOURCES
eval/eval0eval.cc
eval/eval0proc.cc
fil/fil0fil.cc
+ fil/fil0pagecompress.cc
fsp/fsp0fsp.cc
fut/fut0fut.cc
fut/fut0lst.cc
diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc
index 6efa14e6791..328d5a6f3bf 100644
--- a/storage/innobase/buf/buf0buf.cc
+++ b/storage/innobase/buf/buf0buf.cc
@@ -2,6 +2,7 @@
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2008, Google Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -3254,6 +3255,7 @@ buf_page_init_low(
bpage->access_time = 0;
bpage->newest_modification = 0;
bpage->oldest_modification = 0;
+ bpage->write_size = 0;
HASH_INVALIDATE(bpage, hash);
#if defined UNIV_DEBUG_FILE_ACCESSES || defined UNIV_DEBUG
bpage->file_page_was_freed = FALSE;
diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc
index fb853fe1543..933b56eaf88 100644
--- a/storage/innobase/buf/buf0dblwr.cc
+++ b/storage/innobase/buf/buf0dblwr.cc
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -365,8 +366,8 @@ buf_dblwr_init_or_restore_pages(
/* Read the trx sys header to check if we are using the doublewrite
buffer */
- fil_io(OS_FILE_READ, TRUE, TRX_SYS_SPACE, 0, TRX_SYS_PAGE_NO, 0,
- UNIV_PAGE_SIZE, read_buf, NULL);
+ fil_io(OS_FILE_READ, true, TRX_SYS_SPACE, 0, TRX_SYS_PAGE_NO, 0,
+ UNIV_PAGE_SIZE, read_buf, NULL, 0);
doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
@@ -402,11 +403,11 @@ buf_dblwr_init_or_restore_pages(
fil_io(OS_FILE_READ, TRUE, TRX_SYS_SPACE, 0, block1, 0,
TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE,
- buf, NULL);
- fil_io(OS_FILE_READ, TRUE, TRX_SYS_SPACE, 0, block2, 0,
+ buf, NULL, 0);
+ fil_io(OS_FILE_READ, true, TRX_SYS_SPACE, 0, block2, 0,
TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE,
buf + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE,
- NULL);
+ NULL, 0);
/* Check if any of these pages is half-written in data files, in the
intended position */
@@ -433,8 +434,8 @@ buf_dblwr_init_or_restore_pages(
+ i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
}
- fil_io(OS_FILE_WRITE, TRUE, 0, 0, source_page_no, 0,
- UNIV_PAGE_SIZE, page, NULL);
+ fil_io(OS_FILE_WRITE, true, 0, 0, source_page_no, 0,
+ UNIV_PAGE_SIZE, page, NULL, 0);
} else {
space_id = mach_read_from_4(
@@ -476,7 +477,7 @@ buf_dblwr_init_or_restore_pages(
fil_io(OS_FILE_READ, TRUE, space_id, zip_size,
page_no, 0,
zip_size ? zip_size : UNIV_PAGE_SIZE,
- read_buf, NULL);
+ read_buf, NULL, 0);
/* Check if the page is corrupt */
@@ -528,7 +529,7 @@ buf_dblwr_init_or_restore_pages(
fil_io(OS_FILE_WRITE, TRUE, space_id,
zip_size, page_no, 0,
zip_size ? zip_size : UNIV_PAGE_SIZE,
- page, NULL);
+ page, NULL, 0);
ib_logf(IB_LOG_LEVEL_INFO,
"Recovered the page from"
@@ -714,7 +715,7 @@ buf_dblwr_write_block_to_datafile(
buf_page_get_page_no(bpage), 0,
buf_page_get_zip_size(bpage),
(void*) bpage->zip.data,
- (void*) bpage);
+ (void*) bpage, 0);
return;
}
@@ -727,7 +728,7 @@ buf_dblwr_write_block_to_datafile(
fil_io(OS_FILE_WRITE | OS_AIO_SIMULATED_WAKE_LATER,
FALSE, buf_block_get_space(block), 0,
buf_block_get_page_no(block), 0, UNIV_PAGE_SIZE,
- (void*) block->frame, (void*) block);
+ (void*) block->frame, (void*) block, 0);
}
/********************************************************************//**
@@ -820,7 +821,7 @@ try_again:
fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0,
buf_dblwr->block1, 0, len,
- (void*) write_buf, NULL);
+ (void*) write_buf, NULL, 0);
if (buf_dblwr->first_free <= TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
/* No unwritten pages in the second block. */
@@ -836,7 +837,7 @@ try_again:
fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0,
buf_dblwr->block2, 0, len,
- (void*) write_buf, NULL);
+ (void*) write_buf, NULL, 0);
flush:
/* increment the doublewrite flushed pages counter */
@@ -1056,14 +1057,14 @@ retry:
fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0,
offset, 0, UNIV_PAGE_SIZE,
(void*) (buf_dblwr->write_buf
- + UNIV_PAGE_SIZE * i), NULL);
+ + UNIV_PAGE_SIZE * i), NULL, 0);
} else {
/* It is a regular page. Write it directly to the
doublewrite buffer */
fil_io(OS_FILE_WRITE, TRUE, TRX_SYS_SPACE, 0,
offset, 0, UNIV_PAGE_SIZE,
(void*) ((buf_block_t*) bpage)->frame,
- NULL);
+ NULL, 0);
}
/* Now flush the doublewrite buffer data to disk */
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index 542c1669667..06ae7b5375c 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -1,6 +1,8 @@
/*****************************************************************************
Copyright (c) 1995, 2011, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
+Copyright (c) 2013, Fusion-io. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -673,8 +675,10 @@ buf_flush_write_complete(
flush_type = buf_page_get_flush_type(bpage);
buf_pool->n_flush[flush_type]--;
+#ifdef UNIV_DEBUG
/* fprintf(stderr, "n pending flush %lu\n",
buf_pool->n_flush[flush_type]); */
+#endif
if (buf_pool->n_flush[flush_type] == 0
&& buf_pool->init_flush[flush_type] == FALSE) {
@@ -938,7 +942,7 @@ buf_flush_write_block_low(
FALSE, buf_page_get_space(bpage), zip_size,
buf_page_get_page_no(bpage), 0,
zip_size ? zip_size : UNIV_PAGE_SIZE,
- frame, bpage);
+ frame, bpage, 0);
} else if (flush_type == BUF_FLUSH_SINGLE_PAGE) {
buf_dblwr_write_single_page(bpage);
} else {
@@ -1213,7 +1217,9 @@ buf_flush_try_neighbors(
}
}
+#ifdef UNIV_DEBUG
/* fprintf(stderr, "Flush area: low %lu high %lu\n", low, high); */
+#endif
if (high > fil_space_get_size(space)) {
high = fil_space_get_size(space);
@@ -1655,7 +1661,7 @@ pages: to avoid deadlocks, this function must be written so that it cannot
end up waiting for these latches! NOTE 2: in the case of a flush list flush,
the calling thread is not allowed to own any latches on pages!
@return number of blocks for which the write request was queued */
-static
+//static
ulint
buf_flush_batch(
/*============*/
@@ -1712,7 +1718,7 @@ buf_flush_batch(
/******************************************************************//**
Gather the aggregated stats for both flush list and LRU list flushing */
-static
+//static
void
buf_flush_common(
/*=============*/
@@ -1737,7 +1743,7 @@ buf_flush_common(
/******************************************************************//**
Start a buffer flush batch for LRU or flush list */
-static
+//static
ibool
buf_flush_start(
/*============*/
@@ -1766,7 +1772,7 @@ buf_flush_start(
/******************************************************************//**
End a buffer flush batch for LRU or flush list */
-static
+//static
void
buf_flush_end(
/*==========*/
@@ -1816,11 +1822,55 @@ buf_flush_wait_batch_end(
}
} else {
thd_wait_begin(NULL, THD_WAIT_DISKIO);
- os_event_wait(buf_pool->no_flush[type]);
+ os_event_wait(buf_pool->no_flush[type]);
thd_wait_end(NULL);
}
}
+/* JAN: TODO: */
+/*******************************************************************//**
+This utility flushes dirty blocks from the end of the LRU list and also
+puts replaceable clean pages from the end of the LRU list to the free
+list.
+NOTE: The calling thread is not allowed to own any latches on pages!
+@return true if a batch was queued successfully. false if another batch
+of same type was already running. */
+static
+bool
+pgcomp_buf_flush_LRU(
+/*==========*/
+ buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */
+ ulint min_n, /*!< in: wished minimum mumber of blocks
+ flushed (it is not guaranteed that the
+ actual number is that big, though) */
+ ulint* n_processed) /*!< out: the number of pages
+ which were processed is passed
+ back to caller. Ignored if NULL */
+{
+ ulint page_count;
+
+ if (n_processed) {
+ *n_processed = 0;
+ }
+
+ if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) {
+ return(false);
+ }
+
+ page_count = buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0);
+
+ buf_flush_end(buf_pool, BUF_FLUSH_LRU);
+
+ buf_flush_common(BUF_FLUSH_LRU, page_count);
+
+ if (n_processed) {
+ *n_processed = page_count;
+ }
+
+ return(true);
+}
+/* JAN: TODO: END: */
+
/*******************************************************************//**
This utility flushes dirty blocks from the end of the LRU list and also
puts replaceable clean pages from the end of the LRU list to the free
@@ -1863,6 +1913,168 @@ buf_flush_LRU(
return(true);
}
+/* JAN: TODO: */
+/*******************************************************************//**/
+extern int is_pgcomp_wrk_init_done(void);
+extern int pgcomp_flush_work_items(int buf_pool_inst, int *pages_flushed,
+ int flush_type, int min_n, unsigned long long lsn_limit);
+
+#define MT_COMP_WATER_MARK 50
+
+#include <time.h>
+int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time)
+{
+ if (g_time->tv_usec < s_time->tv_usec)
+ {
+ int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000 + 1;
+ s_time->tv_usec -= 1000000 * nsec;
+ s_time->tv_sec += nsec;
+ }
+ if (g_time->tv_usec - s_time->tv_usec > 1000000)
+ {
+ int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000;
+ s_time->tv_usec += 1000000 * nsec;
+ s_time->tv_sec -= nsec;
+ }
+ d_time->tv_sec = g_time->tv_sec - s_time->tv_sec;
+ d_time->tv_usec = g_time->tv_usec - s_time->tv_usec;
+
+ return 0;
+}
+
+static pthread_mutex_t pgcomp_mtx = PTHREAD_MUTEX_INITIALIZER;
+/*******************************************************************//**
+Multi-threaded version of buf_flush_list
+*/
+UNIV_INTERN
+bool
+pgcomp_buf_flush_list(
+/*==================*/
+ ulint min_n, /*!< in: wished minimum mumber of blocks
+ flushed (it is not guaranteed that the
+ actual number is that big, though) */
+ lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
+ blocks whose oldest_modification is
+ smaller than this should be flushed
+ (if their number does not exceed
+ min_n), otherwise ignored */
+ ulint* n_processed) /*!< out: the number of pages
+ which were processed is passed
+ back to caller. Ignored if NULL */
+
+{
+ ulint i;
+ bool success = true;
+ struct timeval p_start_time, p_end_time, d_time;
+
+ if (n_processed) {
+ *n_processed = 0;
+ }
+
+ if (min_n != ULINT_MAX) {
+ /* Ensure that flushing is spread evenly amongst the
+ buffer pool instances. When min_n is ULINT_MAX
+ we need to flush everything up to the lsn limit
+ so no limit here. */
+ min_n = (min_n + srv_buf_pool_instances - 1)
+ / srv_buf_pool_instances;
+ }
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_start_time, 0x0);
+#endif
+ if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) {
+ int cnt_flush[32];
+
+ //stack_trace();
+ pthread_mutex_lock(&pgcomp_mtx);
+ //gettimeofday(&p_start_time, 0x0);
+ //fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n);
+ pgcomp_flush_work_items(srv_buf_pool_instances,
+ cnt_flush, BUF_FLUSH_LIST,
+ min_n, lsn_limit);
+
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ if (n_processed) {
+ *n_processed += cnt_flush[i];
+ }
+ if (cnt_flush[i]) {
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_FLUSH_BATCH_TOTAL_PAGE,
+ MONITOR_FLUSH_BATCH_COUNT,
+ MONITOR_FLUSH_BATCH_PAGES,
+ cnt_flush[i]);
+
+ }
+ }
+
+ pthread_mutex_unlock(&pgcomp_mtx);
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+ fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", (
+ min_n * srv_buf_pool_instances), *n_processed,
+ (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
+#endif
+ return(success);
+ }
+ /* Flush to lsn_limit in all buffer pool instances */
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ buf_pool_t* buf_pool;
+ ulint page_count = 0;
+
+ buf_pool = buf_pool_from_array(i);
+
+ if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) {
+ /* We have two choices here. If lsn_limit was
+ specified then skipping an instance of buffer
+ pool means we cannot guarantee that all pages
+ up to lsn_limit has been flushed. We can
+ return right now with failure or we can try
+ to flush remaining buffer pools up to the
+ lsn_limit. We attempt to flush other buffer
+ pools based on the assumption that it will
+ help in the retry which will follow the
+ failure. */
+ success = false;
+
+ continue;
+ }
+
+ page_count = buf_flush_batch(
+ buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit);
+
+ buf_flush_end(buf_pool, BUF_FLUSH_LIST);
+
+ buf_flush_common(BUF_FLUSH_LIST, page_count);
+
+ if (n_processed) {
+ *n_processed += page_count;
+ }
+
+ if (page_count) {
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_FLUSH_BATCH_TOTAL_PAGE,
+ MONITOR_FLUSH_BATCH_COUNT,
+ MONITOR_FLUSH_BATCH_PAGES,
+ page_count);
+ }
+ }
+
+#if UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", (
+ min_n * srv_buf_pool_instances), *n_processed,
+ (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
+#endif
+ return(success);
+}
+#endif
+/* JAN: TODO: END: */
+
/*******************************************************************//**
This utility flushes dirty blocks from the end of the flush list of
all buffer pool instances.
@@ -1890,6 +2102,12 @@ buf_flush_list(
ulint i;
bool success = true;
+ /* JAN: TODO: */
+ if (is_pgcomp_wrk_init_done()) {
+ return(pgcomp_buf_flush_list(min_n, lsn_limit, n_processed));
+ }
+ /* JAN: TODO: END: */
+
if (n_processed) {
*n_processed = 0;
}
@@ -2043,6 +2261,59 @@ buf_flush_single_page_from_LRU(
return(freed);
}
+/* JAN: TODO: */
+/*********************************************************************//**
+pgcomp_Clears up tail of the LRU lists:
+* Put replaceable pages at the tail of LRU to the free list
+* Flush dirty pages at the tail of LRU to the disk
+The depth to which we scan each buffer pool is controlled by dynamic
+config parameter innodb_LRU_scan_depth.
+@return total pages flushed */
+UNIV_INTERN
+ulint
+pgcomp_buf_flush_LRU_tail(void)
+/*====================*/
+{
+ struct timeval p_start_time, p_end_time, d_time;
+ ulint total_flushed=0, i=0;
+ int cnt_flush[32];
+
+#if UNIV_DEBUG
+ gettimeofday(&p_start_time, 0x0);
+#endif
+ assert(is_pgcomp_wrk_init_done());
+
+ pthread_mutex_lock(&pgcomp_mtx);
+ pgcomp_flush_work_items(srv_buf_pool_instances,
+ cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
+
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ if (cnt_flush[i]) {
+ total_flushed += cnt_flush[i];
+
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_LRU_BATCH_TOTAL_PAGE,
+ MONITOR_LRU_BATCH_COUNT,
+ MONITOR_LRU_BATCH_PAGES,
+ cnt_flush[i]);
+ }
+ }
+
+ pthread_mutex_unlock(&pgcomp_mtx);
+
+#if UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", (
+ srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed,
+ (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
+#endif
+
+ return(total_flushed);
+}
+/* JAN: TODO: END: */
+
/*********************************************************************//**
Clears up tail of the LRU lists:
* Put replaceable pages at the tail of LRU to the free list
@@ -2056,6 +2327,12 @@ buf_flush_LRU_tail(void)
/*====================*/
{
ulint total_flushed = 0;
+ /* JAN: TODO: */
+ if(is_pgcomp_wrk_init_done())
+ {
+ return(pgcomp_buf_flush_LRU_tail());
+ }
+ /* JAN: TODO: END */
for (ulint i = 0; i < srv_buf_pool_instances; i++) {
@@ -2342,6 +2619,8 @@ page_cleaner_sleep_if_needed(
}
}
+
+
/******************************************************************//**
page_cleaner thread tasked with flushing dirty pages from the buffer
pools. As of now we'll have only one instance of this thread.
@@ -2357,6 +2636,7 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)(
ulint next_loop_time = ut_time_ms() + 1000;
ulint n_flushed = 0;
ulint last_activity = srv_get_activity_count();
+ ulint n_lru=0, n_pgc_flush=0, n_pgc_batch=0;
ut_ad(!srv_read_only_mode);
@@ -2368,7 +2648,6 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)(
fprintf(stderr, "InnoDB: page_cleaner thread running, id %lu\n",
os_thread_pf(os_thread_get_curr_id()));
#endif /* UNIV_DEBUG_THREAD_CREATION */
-
buf_page_cleaner_is_active = TRUE;
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
@@ -2388,12 +2667,23 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)(
last_activity = srv_get_activity_count();
/* Flush pages from end of LRU if required */
- n_flushed = buf_flush_LRU_tail();
+ n_lru = n_flushed = buf_flush_LRU_tail();
+#ifdef UNIV_DEBUG
+ if (n_lru) {
+ fprintf(stderr,"n_lru:%lu ",n_lru);
+ }
+#endif
/* Flush pages from flush_list if required */
- n_flushed += page_cleaner_flush_pages_if_needed();
+ n_flushed += n_pgc_flush = page_cleaner_flush_pages_if_needed();
+
+#ifdef UNIV_DEBUG
+ if (n_pgc_flush) {
+ fprintf(stderr,"n_pgc_flush:%lu ",n_pgc_flush);
+ }
+#endif
} else {
- n_flushed = page_cleaner_do_flush_batch(
+ n_pgc_batch = n_flushed = page_cleaner_do_flush_batch(
PCT_IO(100),
LSN_MAX);
@@ -2404,7 +2694,18 @@ DECLARE_THREAD(buf_flush_page_cleaner_thread)(
MONITOR_FLUSH_BACKGROUND_PAGES,
n_flushed);
}
+#ifdef UNIV_DEBUG
+ if (n_pgc_batch) {
+ fprintf(stderr,"n_pgc_batch:%lu ",n_pgc_batch);
+ }
+#endif
}
+#ifdef UNIV_DEBUG
+ if (n_lru || n_pgc_flush || n_pgc_batch) {
+ fprintf(stderr,"\n");
+ n_lru = n_pgc_flush = n_pgc_batch = 0;
+ }
+#endif
}
ut_ad(srv_shutdown_state > 0);
@@ -2573,8 +2874,9 @@ buf_flush_validate(
return(ret);
}
+
#endif /* UNIV_DEBUG || UNIV_BUF_DEBUG */
-#endif /* !UNIV_HOTBACKUP */
+
#ifdef UNIV_DEBUG
/******************************************************************//**
diff --git a/storage/innobase/buf/buf0mtflu.cc b/storage/innobase/buf/buf0mtflu.cc
new file mode 100644
index 00000000000..7abe0547877
--- /dev/null
+++ b/storage/innobase/buf/buf0mtflu.cc
@@ -0,0 +1,1103 @@
+/*****************************************************************************
+
+Copyright (C) 2013 Fusion-io. All Rights Reserved.
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file buf/buf0mtflu.cc
+Multi-threaded flush method implementation
+
+Created 06/11/2013 Dhananjoy Das DDas@fusionio.com
+Modified 12/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+#include <time.h>
+
+#ifdef UNIV_PFS_MUTEX
+/* Key to register fil_system_mutex with performance schema */
+UNIV_INTERN mysql_pfs_key_t mtflush_mutex_key;
+#endif /* UNIV_PFS_MUTEX */
+
+/* Mutex to protect critical sections during multi-threaded flush */
+ib_mutex_t mt_flush_mutex;
+
+#define MT_COMP_WATER_MARK 50
+
+/* Work item status */
+typedef enum {
+ WORK_ITEM_SET=0, /* Work item information set */
+ WORK_ITEM_START=1, /* Work item assigned to thread and
+ execution started */
+ WORK_ITEM_DONE=2, /* Work item execution done */
+} mtflu_witem_status_t;
+
+/* Work thread status */
+typedef enum {
+ WORK_THREAD_NOT_INIT=0, /* Work thread not initialized */
+ WORK_THREAD_INITIALIZED=1, /* Work thread initialized */
+ WORK_THREAD_SIG_WAITING=2, /* Work thred signaled */
+ WORK_THREAD_RUNNING=3, /* Work thread running */
+ WORK_THREAD_NO_WORK=4, /* Work thread has no work to do */
+} mtflu_wthr_status_t;
+
+/* Structure containing multi-treaded flush thread information */
+typedef struct {
+ os_thread_t wthread_id; /* Thread id */
+ opq_t *wq; /* Write queue ? */
+ opq_t *cq; /* Commit queue ?*/
+ ib_mutex_t thread_mutex; /* Mutex proecting below
+ structures */
+ mtflu_wthr_status_t thread_status; /* Thread status */
+ ib_uint64_t total_num_processed; /* Total number of
+ pages processed */
+ ib_uint64_t cycle_num_processed; /* Numper of pages
+ processed on last
+ cycle */
+ ulint check_wrk_done_count; /* Number of pages
+ to process in this
+ work item ? */
+ ulint done_cnt_flag; /* Number of pages
+ processed in this
+ work item ?*/
+} mtflu_thread_t;
+
+struct work_item_t {
+ /****************************/
+ /* Need to group into struct*/
+ buf_pool_t* buf_pool; //buffer-pool instance
+ int flush_type; //flush-type for buffer-pool flush operation
+ ulint min; //minimum number of pages requested to be flushed
+ lsn_t lsn_limit; //lsn limit for the buffer-pool flush operation
+ /****************************/
+
+ unsigned long result; //flush pages count
+ unsigned long t_usec; //time-taken in usec
+ os_thread_t id_usr; /* thread-id
+ currently working , why ? */
+ mtflu_witem_status_t wi_status; /* work item status */
+
+ UT_LIST_NODE_T(work_node_t) next;
+};
+
+/* Multi-threaded flush system structure */
+typedef struct {
+ int pgc_n_threads = 8;// ??? why what this is
+
+ mtflu_thread_t pc_sync[PGCOMP_MAX_WORKER];
+ wrk_t work_items[PGCOMP_MAX_WORKER];
+ int pgcomp_wrk_initialized = -1; /* ???? */
+ opq_t wq; /* write queue ? */
+ opq_t cq; /* commit queue ? */
+} mtflu_system_t;
+
+typedef enum op_q_status {
+ Q_NOT_INIT=0,
+ Q_EMPTY=1,
+ Q_INITIALIZED=2,
+ Q_PROCESS=3,
+ Q_DONE=4,
+ Q_ERROR=5,
+ Q_STATUS_UNDEFINED
+} q_status_t;
+
+// NOTE: jan: could we use ut/ut0wqueue.(h|cc)
+// NOTE: jan: here ????, it would handle waiting, signaling
+// and contains simple interface
+
+typedef struct op_queue
+{
+ ib_mutex_t mtx; /* Mutex protecting below variables
+ */
+ os_cond_t cv; /* ? is waiting here ? */
+ q_status_t flag; /* Operation queue status */
+ UT_LIST_BASE_NODE_T(work_item_t) work_list;
+} opq_t;
+
+
+/*******************************************************************//**
+Initialize multi-threaded flush.
+*/
+void
+buf_mtflu_init(void)
+/*================*/
+{
+ mutex_create(mtflush_mutex_key,
+ &mt_flush_mutex, SYNC_ANY_LATCH);
+}
+
+/*******************************************************************//**
+This utility flushes dirty blocks from the end of the LRU list and also
+puts replaceable clean pages from the end of the LRU list to the free
+list.
+NOTE: The calling thread is not allowed to own any latches on pages!
+@return true if a batch was queued successfully. false if another batch
+of same type was already running. */
+bool
+buf_mtflu_flush_LRU(
+/*================*/
+ buf_pool_t* buf_pool, /*!< in/out: buffer pool instance */
+ ulint min_n, /*!< in: wished minimum mumber of blocks
+ flushed (it is not guaranteed that the
+ actual number is that big, though) */
+ ulint* n_processed) /*!< out: the number of pages
+ which were processed is passed
+ back to caller. Ignored if NULL */
+{
+ ulint page_count;
+
+ if (n_processed) {
+ *n_processed = 0;
+ }
+
+ if (!buf_flush_start(buf_pool, BUF_FLUSH_LRU)) {
+ return(false);
+ }
+
+ page_count = buf_flush_batch(buf_pool, BUF_FLUSH_LRU, min_n, 0);
+
+ buf_flush_end(buf_pool, BUF_FLUSH_LRU);
+
+ buf_flush_common(BUF_FLUSH_LRU, page_count);
+
+ if (n_processed) {
+ *n_processed = page_count;
+ }
+
+ return(true);
+}
+
+#ifdef UNIV_DEBUG
+/*******************************************************************//**
+Utility function to calculate time difference between start time
+and end time.
+@return Time difference.
+*/
+UNIV_INTERN
+void
+mtflu_timediff(
+/*===========*/
+ struct timeval *g_time, /*!< in/out: Start time*/
+ struct timeval *s_time, /*!< in/out: End time */
+ struct timeval *d_time) /*!< out: Time difference */
+{
+ if (g_time->tv_usec < s_time->tv_usec)
+ {
+ int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000 + 1;
+ s_time->tv_usec -= 1000000 * nsec;
+ s_time->tv_sec += nsec;
+ }
+ if (g_time->tv_usec - s_time->tv_usec > 1000000)
+ {
+ int nsec = (s_time->tv_usec - g_time->tv_usec) / 1000000;
+ s_time->tv_usec += 1000000 * nsec;
+ s_time->tv_sec -= nsec;
+ }
+ d_time->tv_sec = g_time->tv_sec - s_time->tv_sec;
+ d_time->tv_usec = g_time->tv_usec - s_time->tv_usec;
+}
+#endif
+
+/*******************************************************************//**
+This utility flushes dirty blocks from the end of the flush list of
+all buffer pool instances. This is multi-threaded version of buf_flush_list.
+NOTE: The calling thread is not allowed to own any latches on pages!
+@return true if a batch was queued successfully for each buffer pool
+instance. false if another batch of same type was already running in
+at least one of the buffer pool instance */
+bool
+buf_mtflu_flush_list(
+/*=================*/
+ ulint min_n, /*!< in: wished minimum mumber of blocks
+ flushed (it is not guaranteed that the
+ actual number is that big, though) */
+ lsn_t lsn_limit, /*!< in the case BUF_FLUSH_LIST all
+ blocks whose oldest_modification is
+ smaller than this should be flushed
+ (if their number does not exceed
+ min_n), otherwise ignored */
+ ulint* n_processed) /*!< out: the number of pages
+ which were processed is passed
+ back to caller. Ignored if NULL */
+
+{
+ ulint i;
+ bool success = true;
+ struct timeval p_start_time, p_end_time, d_time;
+
+ if (n_processed) {
+ *n_processed = 0;
+ }
+
+ if (min_n != ULINT_MAX) {
+ /* Ensure that flushing is spread evenly amongst the
+ buffer pool instances. When min_n is ULINT_MAX
+ we need to flush everything up to the lsn limit
+ so no limit here. */
+ min_n = (min_n + srv_buf_pool_instances - 1)
+ / srv_buf_pool_instances;
+ }
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_start_time, 0x0);
+#endif
+ if(is_pgcomp_wrk_init_done() && (min_n > MT_COMP_WATER_MARK)) {
+ int cnt_flush[32];
+
+ mutex_enter(&mt_flush_mutex);
+
+#ifdef UNIV_DEBUG
+ fprintf(stderr, "Calling into wrk-pgcomp [min:%lu]", min_n);
+#endif
+ pgcomp_flush_work_items(srv_buf_pool_instances,
+ cnt_flush, BUF_FLUSH_LIST,
+ min_n, lsn_limit);
+
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ if (n_processed) {
+ *n_processed += cnt_flush[i];
+ }
+ if (cnt_flush[i]) {
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_FLUSH_BATCH_TOTAL_PAGE,
+ MONITOR_FLUSH_BATCH_COUNT,
+ MONITOR_FLUSH_BATCH_PAGES,
+ cnt_flush[i]);
+
+ }
+ }
+
+ mutex_exit(&pgcomp_mtx);
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+ fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", (
+ min_n * srv_buf_pool_instances), *n_processed,
+ (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
+#endif
+ return(success);
+ }
+
+ /* Flush to lsn_limit in all buffer pool instances */
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ buf_pool_t* buf_pool;
+ ulint page_count = 0;
+
+ buf_pool = buf_pool_from_array(i);
+
+ if (!buf_flush_start(buf_pool, BUF_FLUSH_LIST)) {
+ /* We have two choices here. If lsn_limit was
+ specified then skipping an instance of buffer
+ pool means we cannot guarantee that all pages
+ up to lsn_limit has been flushed. We can
+ return right now with failure or we can try
+ to flush remaining buffer pools up to the
+ lsn_limit. We attempt to flush other buffer
+ pools based on the assumption that it will
+ help in the retry which will follow the
+ failure. */
+ success = false;
+
+ continue;
+ }
+
+ page_count = buf_flush_batch(
+ buf_pool, BUF_FLUSH_LIST, min_n, lsn_limit);
+
+ buf_flush_end(buf_pool, BUF_FLUSH_LIST);
+
+ buf_flush_common(BUF_FLUSH_LIST, page_count);
+
+ if (n_processed) {
+ *n_processed += page_count;
+ }
+
+ if (page_count) {
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_FLUSH_BATCH_TOTAL_PAGE,
+ MONITOR_FLUSH_BATCH_COUNT,
+ MONITOR_FLUSH_BATCH_PAGES,
+ page_count);
+ }
+ }
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ fprintf(stderr, "[2] [*n_processed: (min:%lu)%lu %llu usec]\n", (
+ min_n * srv_buf_pool_instances), *n_processed,
+ (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
+#endif
+ return(success);
+}
+
+/*********************************************************************//**
+Clear up tail of the LRU lists:
+* Put replaceable pages at the tail of LRU to the free list
+* Flush dirty pages at the tail of LRU to the disk
+The depth to which we scan each buffer pool is controlled by dynamic
+config parameter innodb_LRU_scan_depth.
+@return total pages flushed */
+ulint
+buf_mtflu_flush_LRU_tail(void)
+/*==========================*/
+{
+ ulint total_flushed=0, i=0;
+ int cnt_flush[32];
+
+#ifdef UNIV_DEBUG
+ struct timeval p_start_time, p_end_time, d_time;
+ gettimeofday(&p_start_time, 0x0);
+#endif
+ assert(is_pgcomp_wrk_init_done());
+
+ mutex_enter(&pgcomp_mtx);
+ pgcomp_flush_work_items(srv_buf_pool_instances,
+ cnt_flush, BUF_FLUSH_LRU, srv_LRU_scan_depth, 0);
+
+ for (i = 0; i < srv_buf_pool_instances; i++) {
+ if (cnt_flush[i]) {
+ total_flushed += cnt_flush[i];
+
+ MONITOR_INC_VALUE_CUMULATIVE(
+ MONITOR_LRU_BATCH_TOTAL_PAGE,
+ MONITOR_LRU_BATCH_COUNT,
+ MONITOR_LRU_BATCH_PAGES,
+ cnt_flush[i]);
+ }
+ }
+
+ mutex_exit(&pgcomp_mtx);
+
+#if UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ fprintf(stderr, "[1] [*n_processed: (min:%lu)%lu %llu usec]\n", (
+ srv_LRU_scan_depth * srv_buf_pool_instances), total_flushed,
+ (unsigned long long)(d_time.tv_usec+(d_time.tv_sec*1000000)));
+#endif
+
+ return(total_flushed);
+}
+
+/*******************************************************************//**
+Set work done count to given count.
+@return 1 if still work to do, 0 if no work left */
+int
+set_check_done_flag_count(int cnt)
+/*================*/
+{
+ return(check_wrk_done_count = cnt);
+}
+
+/*******************************************************************//**
+?
+@return why ? */
+int
+set_pgcomp_wrk_init_done(void)
+/*================*/
+{
+ pgcomp_wrk_initialized = 1;
+ return 0;
+}
+
+/*******************************************************************//**
+?
+@return true if work is initialized */
+bool
+is_pgcomp_wrk_init_done(void)
+/*================*/
+{
+ return(pgcomp_wrk_initialized == 1);
+}
+
+/*******************************************************************//**
+Set current done pages count to the given value
+@return number of pages flushed */
+int
+set_done_cnt_flag(int val)
+/*================*/
+{
+ /*
+ * Assumption: The thread calling into set_done_cnt_flag
+ * needs to have "cq.mtx" acquired, else not safe.
+ */
+ done_cnt_flag = val;
+ return done_cnt_flag;
+}
+
+/*******************************************************************//**
+?
+@return number of pages flushed */
+int
+cv_done_inc_flag_sig(thread_sync_t * ppc)
+/*================*/
+{
+ mutex_enter(&ppc->cq->mtx);
+ ppc->stat_universal_num_processed++;
+ ppc->stat_cycle_num_processed++;
+ done_cnt_flag++;
+ if(!(done_cnt_flag <= check_wrk_done_count)) {
+ fprintf(stderr, "ERROR: done_cnt:%d check_wrk_done_count:%d\n",
+ done_cnt_flag, check_wrk_done_count);
+ }
+ assert(done_cnt_flag <= check_wrk_done_count);
+ mutex_exit(&ppc->cq->mtx);
+ if(done_cnt_flag == check_wrk_done_count) {
+ // why below does not need mutex protection ?
+ ppc->wq->flag = Q_DONE;
+ mutex_enter(&ppc->cq->mtx);
+ ppc->cq->flag = Q_DONE;
+ os_cond_signal(&ppc->cq->cv);
+ mutex_exit(&ppc->cq->mtx);
+ }
+ return(done_cnt_flag);
+}
+
+/*******************************************************************//**
+Remove work item from queue, in my opinion not needed after we use
+UT_LIST
+@return number of pages flushed */
+int
+q_remove_wrk(opq_t *q, wrk_t **wi)
+/*================*/
+{
+ int ret = 0;
+
+ if(!wi || !q) {
+ return -1;
+ }
+
+ mutex_enter(&q->mtx);
+ assert(!((q->tail == NULL) && (q->head != NULL)));
+ assert(!((q->tail != NULL) && (q->head == NULL)));
+
+ /* get the first in the list*/
+ *wi = q->head;
+ if(q->head) {
+ ret = 0;
+ q->head = q->head->next;
+ (*wi)->next = NULL;
+ if(!q->head) {
+ q->tail = NULL;
+ }
+ } else {
+ q->tail = NULL;
+ ret = 1; /* indicating remove from queue failed */
+ }
+ mutex_exit(&q->mtx);
+ return (ret);
+}
+
+/*******************************************************************//**
+Return true if work item has being assigned to a thread or false
+if work item is not assigned.
+@return true if work is assigned, false if not */
+bool
+is_busy_wrk_itm(wrk_t *wi)
+/*================*/
+{
+ if(!wi) {
+ return -1;
+ }
+ return(!(wi->id_usr == -1));
+}
+
+/*******************************************************************//**
+Initialize work items.
+@return why ? */
+int
+setup_wrk_itm(int items)
+/*================*/
+{
+ int i;
+ for(i=0; i<items; i++) {
+ work_items[i].buf_pool = NULL;
+ work_items[i].result = 0;
+ work_items[i].t_usec = 0;
+ work_items[i].id_usr = -1;
+ work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
+ work_items[i].next = &work_items[(i+1)%items];
+ }
+ /* last node should be the tail */
+ work_items[items-1].next = NULL;
+ return 0;
+}
+
+/*******************************************************************//**
+Initialize queue
+@return why ? */
+int
+init_queue(opq_t *q)
+/*================*/
+{
+ if(!q) {
+ return -1;
+ }
+ /* Initialize Queue mutex and CV */
+ q->mtx = os_mutex_create();
+ os_cond_init(&q->cv);
+ q->flag = Q_INITIALIZED;
+ q->head = q->tail = NULL;
+
+ return 0;
+}
+
+/// NEEDED ?
+#if 0
+int drain_cq(opq_t *cq, int items)
+{
+ int i=0;
+
+ if(!cq) {
+ return -1;
+ }
+ mutex_enter(&cq->mtx);
+ for(i=0; i<items; i++) {
+ work_items[i].result=0;
+ work_items[i].t_usec = 0;
+ work_items[i].id_usr = -1;
+ }
+ cq->head = cq->tail = NULL;
+ mutex_unlock(&cq->mtx);
+ return 0;
+}
+#endif
+
+/*******************************************************************//**
+Insert work item list to queue, not needed with UT_LIST
+@return why ? */
+int
+q_insert_wrk_list(opq_t *q, wrk_t *w_list)
+/*================*/
+{
+ if((!q) || (!w_list)) {
+ fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list);
+ return -1;
+ }
+
+ mutex_enter(&q->mtx);
+
+ assert(!((q->tail == NULL) && (q->head != NULL)));
+ assert(!((q->tail != NULL) && (q->head == NULL)));
+
+ /* list is empty */
+ if(!q->tail) {
+ q->head = q->tail = w_list;
+ } else {
+ /* added the first of the node to list */
+ assert(q->head != NULL);
+ q->tail->next = w_list;
+ }
+
+ /* move tail to the last node */
+ while(q->tail->next) {
+ q->tail = q->tail->next;
+ }
+ mutex_exit(&q->mtx);
+
+ return 0;
+}
+
+/*******************************************************************//**
+Flush ?
+@return why ? */
+int
+flush_pool_instance(wrk_t *wi)
+/*================*/
+{
+ struct timeval p_start_time, p_end_time, d_time;
+
+ if(!wi) {
+ fprintf(stderr, "work item invalid wi:%p\n", wi);
+ return -1;
+ }
+
+ wi->t_usec = 0;
+ if (!buf_flush_start(wi->buf_pool, (buf_flush_t)wi->flush_type)) {
+ /* We have two choices here. If lsn_limit was
+ specified then skipping an instance of buffer
+ pool means we cannot guarantee that all pages
+ up to lsn_limit has been flushed. We can
+ return right now with failure or we can try
+ to flush remaining buffer pools up to the
+ lsn_limit. We attempt to flush other buffer
+ pools based on the assumption that it will
+ help in the retry which will follow the
+ failure. */
+ fprintf(stderr, "flush_start Failed, flush_type:%d\n",
+ (buf_flush_t)wi->flush_type);
+ return -1;
+ }
+
+#ifdef UNIV_DEBUG
+ /* Record time taken for the OP in usec */
+ gettimeofday(&p_start_time, 0x0);
+#endif
+
+ if((buf_flush_t)wi->flush_type == BUF_FLUSH_LRU) {
+ /* srv_LRU_scan_depth can be arbitrarily large value.
+ * We cap it with current LRU size.
+ */
+ buf_pool_mutex_enter(wi->buf_pool);
+ wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU);
+ buf_pool_mutex_exit(wi->buf_pool);
+ wi->min = ut_min(srv_LRU_scan_depth,wi->min);
+ }
+
+ wi->result = buf_flush_batch(wi->buf_pool,
+ (buf_flush_t)wi->flush_type,
+ wi->min, wi->lsn_limit);
+
+ buf_flush_end(wi->buf_pool, (buf_flush_t)wi->flush_type);
+ buf_flush_common((buf_flush_t)wi->flush_type, wi->result);
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000));
+#endif
+ return 0;
+}
+
+/*******************************************************************//**
+?
+@return why ? */
+int
+service_page_comp_io(thread_sync_t * ppc)
+/*================*/
+{
+ wrk_t *wi = NULL;
+ int ret=0;
+ struct timespec ts;
+
+ mutex_enter(&ppc->wq->mtx);
+ do{
+ ppc->wt_status = WTHR_SIG_WAITING;
+ ret = os_cond_wait(&ppc->wq->cv, &ppc->wq->mtx);
+ ppc->wt_status = WTHR_RUNNING;
+ if(ret == ETIMEDOUT) {
+ fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%d] ret:%d\n",
+ done_cnt_flag, ret);
+ } else if(ret == EINVAL || ret == EPERM) {
+ fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%d] ret:%d\n",
+ done_cnt_flag, ret);
+ }
+ if(ppc->wq->flag == Q_PROCESS) {
+ break;
+ } else {
+ mutex_exit(&ppc->wq->mtx);
+ return -1;
+ }
+ } while (ppc->wq->flag == Q_PROCESS && ret == 0);
+
+ mutex_exit(&ppc->wq->mtx);
+
+ while (ppc->cq->flag == Q_PROCESS) {
+ wi = NULL;
+ /* Get the work item */
+ if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) {
+ ppc->wt_status = WTHR_NO_WORK;
+ return -1;
+ }
+
+ assert(ret==0);
+ assert(wi != NULL);
+ assert(0 == is_busy_wrk_itm(wi));
+ assert(wi->id_usr == -1);
+
+ wi->id_usr = ppc->wthread;
+ wi->wi_status = WRK_ITEM_START;
+
+ /* Process work item */
+ if(0 != (ret = flush_pool_instance(wi))) {
+ fprintf(stderr, "FLUSH op failed ret:%d\n", ret);
+ wi->wi_status = WRK_ITEM_FAILED;
+ }
+ ret = q_insert_wrk_list(ppc->cq, wi);
+
+ assert(0==ret);
+ assert(check_wrk_done_count >= done_cnt_flag);
+ wi->wi_status = WRK_ITEM_SUCCESS;
+ if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) {
+ break;
+ }
+ }
+ return(0);
+}
+
+/******************************************************************//**
+Thread main function for multi-threaded flush
+@return a dummy parameter*/
+extern "C" UNIV_INTERN
+os_thread_ret_t
+DECLARE_THREAD(page_comp_io_thread)(
+/*==========================================*/
+ void * arg)
+{
+ thread_sync_t *ppc_io = ((thread_sync_t *)arg);
+
+ while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
+ service_page_comp_io(ppc_io);
+ ppc_io->stat_cycle_num_processed = 0;
+ }
+ os_thread_exit(NULL);
+ OS_THREAD_DUMMY_RETURN;
+}
+
+/*******************************************************************//**
+Print queue work item
+@return why ? */
+int
+print_queue_wrk_itm(opq_t *q)
+/*================*/
+{
+#if UNIV_DEBUG
+ wrk_t *wi = NULL;
+
+ if(!q) {
+ fprintf(stderr, "queue NULL\n");
+ return -1;
+ }
+
+ if(!q->head || !q->tail) {
+ assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL))));
+ fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail);
+ return 0;
+ }
+
+ mutex_enter(&q->mtx);
+ for(wi = q->head; (wi != NULL) ; wi = wi->next) {
+ //fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n",
+ // wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next);
+ fprintf(stderr, "- [%p] [%s] >%p\n",
+ wi, (wi->id_usr == -1)?"free":"Busy", wi->next);
+ }
+ mutex_exit(&q->mtx);
+#endif
+ return(0);
+}
+
+/*******************************************************************//**
+Print work list
+@return why ? */
+int
+print_wrk_list(wrk_t *wi_list)
+/*================*/
+{
+ wrk_t *wi = wi_list;
+ int i=0;
+
+ if(!wi_list) {
+ fprintf(stderr, "list NULL\n");
+ }
+
+ while(wi) {
+ fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n",
+ wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next);
+ wi = wi->next;
+ i++;
+ }
+ fprintf(stderr, "list len: %d\n", i);
+ return 0;
+}
+
+/*******************************************************************//**
+?
+@return why ? */
+int
+pgcomp_handler(wrk_t *w_list)
+/*================*/
+{
+ struct timespec ts;
+ int ret=0, t_flag=0;
+ opq_t *wrk_q=NULL, *comp_q=NULL;
+ wrk_t *tw_list=NULL;
+
+ wrk_q=&wq;
+ comp_q=&cq;
+
+ mutex_enter(&wrk_q->mtx);
+ /* setup work queue here.. */
+ wrk_q->flag = Q_EMPTY;
+ mutex_exit(&wrk_q->mtx);
+
+ ret = q_insert_wrk_list(wrk_q, w_list);
+ if(ret != 0) {
+ fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n",
+ __FUNCTION__, &wq, w_list);
+ return -1;
+ }
+
+retry_submit:
+ mutex_enter(&wrk_q->mtx);
+ /* setup work queue here.. */
+ wrk_q->flag = Q_INITIALIZED;
+ mutex_exit(&wrk_q->mtx);
+
+
+ mutex_enter(&comp_q->mtx);
+ if(0 != set_done_cnt_flag(0)) {
+ fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__);
+ mutex_exit(&comp_q->mtx);
+ return -1;
+ }
+ comp_q->flag = Q_PROCESS;
+ mutex_enter(&comp_q->mtx);
+
+ /* if threads are waiting request them to start */
+ mutex_enter(&wrk_q->mtx);
+ wrk_q->flag = Q_PROCESS;
+ os_cond_broadcast(&wrk_q->cv);
+ mutex_exit(&wrk_q->mtx);
+
+ /* Wait on all worker-threads to complete */
+ mutex_enter(&comp_q->mtx);
+ if (comp_q->flag != Q_DONE) {
+ do {
+ os_cond_wait(&comp_q->cv, &comp_q->mtx);
+ if(comp_q->flag != Q_DONE) {
+ fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%d\n",
+ comp_q->flag, done_cnt_flag);
+ if (done_cnt_flag != srv_buf_pool_instances) {
+ fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%d\n",
+ comp_q->flag, done_cnt_flag);
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ }
+ continue;
+ } else if (done_cnt_flag != srv_buf_pool_instances) {
+ fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%d\n",
+ comp_q->flag, done_cnt_flag);
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ comp_q->flag = Q_INITIALIZED;
+ mutex_exit(&comp_q->mtx);
+ goto retry_submit;
+
+ ut_ad(!done_cnt_flag);
+ continue;
+ }
+ ut_ad(done_cnt_flag == srv_buf_pool_instances);
+
+ if ((comp_q->flag == Q_DONE) &&
+ (done_cnt_flag == srv_buf_pool_instances)) {
+ break;
+ }
+ } while((comp_q->flag == Q_INITIALIZED) &&
+ (done_cnt_flag != srv_buf_pool_instances));
+ } else {
+ fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%d\n",
+ comp_q->flag, done_cnt_flag);
+ if (!done_cnt_flag) {
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ comp_q->flag = Q_INITIALIZED;
+ mutex_enter(&comp_q->mtx);
+ goto retry_submit;
+ ut_ad(!done_cnt_flag);
+ }
+ ut_ad(done_cnt_flag == srv_buf_pool_instances);
+ }
+
+ mutex_exit(&comp_q->mtx);
+ mutex_enter(&wrk_q->mtx);
+ wrk_q->flag = Q_DONE;
+ mutex_exit(&wrk_q->mtx);
+
+ return 0;
+}
+
+/******************************************************************//**
+@return a dummy parameter*/
+int
+pgcomp_handler_init(
+ int num_threads,
+ int wrk_cnt,
+ opq_t *wq,
+ opq_t *cq)
+/*================*/
+{
+ int i=0;
+
+ if(is_pgcomp_wrk_init_done()) {
+ fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n");
+ return -1;
+ }
+
+ if(!wq || !cq) {
+ fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq);
+ return -1;
+ }
+
+ /* work-item setup */
+ setup_wrk_itm(wrk_cnt);
+
+ /* wq & cq setup */
+ init_queue(wq);
+ init_queue(cq);
+
+ /* Mark each of the thread sync entires */
+ for(i=0; i < PGCOMP_MAX_WORKER; i++) {
+ pc_sync[i].wthread_id = i;
+ }
+
+ /* Create threads for page-compression-flush */
+ for(i=0; i < num_threads; i++) {
+ pc_sync[i].wthread_id = i;
+ pc_sync[i].wq = wq;
+ pc_sync[i].cq = cq;
+ os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)),
+ thread_ids + START_PGCOMP_CNT + i);
+ //pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i];
+ pc_sync[i].wthread = (START_PGCOMP_CNT + i);
+ pc_sync[i].wt_status = WTHR_INITIALIZED;
+ }
+
+ set_check_done_flag_count(wrk_cnt);
+ set_pgcomp_wrk_init_done();
+
+ return 0;
+}
+
+
+/*******************************************************************//**
+Print work thread status information
+@return why ? */
+int
+wrk_thread_stat(
+ thread_sync_t *wthr,
+ unsigned int num_threads)
+/*================*/
+{
+ long stat_tot=0;
+ int i=0;
+ for(i=0; i<num_threads;i++) {
+ stat_tot+=wthr[i].stat_universal_num_processed;
+ fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id,
+ wthr[i].stat_universal_num_processed);
+ }
+ fprintf(stderr, "Stat-Total:%lu\n", stat_tot);
+}
+
+/*******************************************************************//**
+Reset work items
+@return why ? */
+int
+reset_wrk_itm(int items)
+/*================*/
+{
+ int i;
+
+ mutex_enter(&wq.mtx);
+ wq.head = wq.tail = NULL;
+ mutex_exit(&wq.mtx);
+
+ mutex_enter(&cq.mtx);
+ for(i=0;i<items; i++) {
+ work_items[i].id_usr = -1;
+ }
+ cq.head = cq.tail = NULL;
+ mutex_exit(&cq.mtx);
+ return 0;
+}
+
+/*******************************************************************//**
+?
+@return why ? */
+int
+pgcomp_flush_work_items(
+/*================*/
+ int buf_pool_inst,
+ int *per_pool_pages_flushed,
+ int flush_type,
+ int min_n,
+ lsn_t lsn_limit)
+{
+ int ret=0, i=0;
+
+ mutex_enter(&wq.mtx);
+ mutex_enter(&cq.mtx);
+
+ assert(wq.head == NULL);
+ assert(wq.tail == NULL);
+ if(cq.head) {
+ print_wrk_list(cq.head);
+ }
+ assert(cq.head == NULL);
+ assert(cq.tail == NULL);
+
+ for(i=0;i<buf_pool_inst; i++) {
+ work_items[i].buf_pool = buf_pool_from_array(i);
+ work_items[i].flush_type = flush_type;
+ work_items[i].min = min_n;
+ work_items[i].lsn_limit = lsn_limit;
+ work_items[i].id_usr = -1;
+ work_items[i].next = &work_items[(i+1)%buf_pool_inst];
+ work_items[i].wi_status = WRK_ITEM_SET;
+ }
+ work_items[i-1].next=NULL;
+
+ mutex_exit(&cq.mtx);
+ mutex_exit(&wq.mtx);
+
+ pgcomp_handler(work_items);
+
+ mutex_enter(&wq.mtx);
+ mutex_enter(&cq.mtx);
+ /* collect data/results total pages flushed */
+ for(i=0; i<buf_pool_inst; i++) {
+ if(work_items[i].result == -1) {
+ ret = -1;
+ per_pool_pages_flushed[i] = 0;
+ } else {
+ per_pool_pages_flushed[i] = work_items[i].result;
+ }
+ if((work_items[i].id_usr == -1) && (work_items[i].wi_status == WRK_ITEM_SET )) {
+ fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].flush_type);
+ assert(0);
+ }
+ }
+
+ wq.flag = cq.flag = Q_INITIALIZED;
+
+ mutex_exit(&cq.mtx);
+ mutex_exit(&wq.mtx);
+
+#if UNIV_DEBUG
+ /* Print work-list stats */
+ fprintf(stderr, "==wq== [DONE]\n");
+ print_wrk_list(wq.head);
+ fprintf(stderr, "==cq== [DONE]\n");
+ print_wrk_list(cq.head);
+ fprintf(stderr, "==worker-thread-stats==\n");
+ wrk_thread_stat(pc_sync, pgc_n_threads);
+#endif
+
+ /* clear up work-queue for next flush */
+ reset_wrk_itm(buf_pool_inst);
+ return(ret);
+}
+
+
+
diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc
index 3a579e251ff..174b4ab278f 100644
--- a/storage/innobase/buf/buf0rea.cc
+++ b/storage/innobase/buf/buf0rea.cc
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -182,14 +183,14 @@ buf_read_page_low(
*err = fil_io(OS_FILE_READ | wake_later
| ignore_nonexistent_pages,
sync, space, zip_size, offset, 0, zip_size,
- bpage->zip.data, bpage);
+ bpage->zip.data, bpage, 0);
} else {
ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
*err = fil_io(OS_FILE_READ | wake_later
| ignore_nonexistent_pages,
sync, space, 0, offset, 0, UNIV_PAGE_SIZE,
- ((buf_block_t*) bpage)->frame, bpage);
+ ((buf_block_t*) bpage)->frame, bpage, 0);
}
thd_wait_end(NULL);
diff --git a/storage/innobase/dict/dict0dict.cc b/storage/innobase/dict/dict0dict.cc
index a560dc54eac..a382b211275 100644
--- a/storage/innobase/dict/dict0dict.cc
+++ b/storage/innobase/dict/dict0dict.cc
@@ -2,6 +2,7 @@
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -1446,8 +1447,8 @@ dict_table_rename_in_cache(
ibool exists;
char* filepath;
- ut_ad(table->space != TRX_SYS_SPACE);
-
+ ut_ad(table->space != TRX_SYS_SPACE);
+
if (DICT_TF_HAS_DATA_DIR(table->flags)) {
dict_get_and_save_data_dir_path(table, true);
@@ -1459,7 +1460,7 @@ dict_table_rename_in_cache(
filepath = fil_make_ibd_name(table->name, false);
}
- fil_delete_tablespace(table->space, BUF_REMOVE_FLUSH_NO_WRITE);
+ fil_delete_tablespace(table->space, BUF_REMOVE_FLUSH_NO_WRITE);
/* Delete any temp file hanging around. */
if (os_file_status(filepath, &exists, &type)
diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc
index 1779ae86c46..2bf5922e07d 100644
--- a/storage/innobase/fil/fil0fil.cc
+++ b/storage/innobase/fil/fil0fil.cc
@@ -1,6 +1,8 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013 SkySQL Ab. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -24,6 +26,8 @@ Created 10/25/1995 Heikki Tuuri
*******************************************************/
#include "fil0fil.h"
+#include "fil0pagecompress.h"
+#include "fsp0pagecompress.h"
#include <debug_sync.h>
#include <my_dbug.h>
@@ -54,6 +58,14 @@ Created 10/25/1995 Heikki Tuuri
# include "srv0srv.h"
static ulint srv_data_read, srv_data_written;
#endif /* !UNIV_HOTBACKUP */
+#include "zlib.h"
+#ifdef __linux__
+#include <linux/fs.h>
+#include <sys/ioctl.h>
+#include <fcntl.h>
+#include <linux/falloc.h>
+#endif
+#include "row0mysql.h"
/*
IMPLEMENTATION OF THE TABLESPACE MEMORY CACHE
@@ -428,11 +440,16 @@ fil_read(
block size multiple */
void* buf, /*!< in/out: buffer where to store data read;
in aio this must be appropriately aligned */
- void* message) /*!< in: message for aio handler if non-sync
+ void* message, /*!< in: message for aio handler if non-sync
aio used, else ignored */
+ ulint write_size) /*!< in/out: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
{
return(fil_io(OS_FILE_READ, sync, space_id, zip_size, block_offset,
- byte_offset, len, buf, message));
+ byte_offset, len, buf, message, write_size));
}
/********************************************************************//**
@@ -457,18 +474,22 @@ fil_write(
be a block size multiple */
void* buf, /*!< in: buffer from which to write; in aio
this must be appropriately aligned */
- void* message) /*!< in: message for aio handler if non-sync
+ void* message, /*!< in: message for aio handler if non-sync
aio used, else ignored */
+ ulint write_size) /*!< in/out: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
{
ut_ad(!srv_read_only_mode);
return(fil_io(OS_FILE_WRITE, sync, space_id, zip_size, block_offset,
- byte_offset, len, buf, message));
+ byte_offset, len, buf, message, write_size));
}
/*******************************************************************//**
Returns the table space by a given id, NULL if not found. */
-UNIV_INLINE
fil_space_t*
fil_space_get_by_id(
/*================*/
@@ -486,6 +507,19 @@ fil_space_get_by_id(
return(space);
}
+/****************************************************************//**
+Get space id from fil node */
+ulint
+fil_node_get_space_id(
+/*==================*/
+ fil_node_t* node) /*!< in: Compressed node*/
+{
+ ut_ad(node);
+ ut_ad(node->space);
+
+ return (node->space->id);
+}
+
/*******************************************************************//**
Returns the table space by a given name, NULL if not found. */
UNIV_INLINE
@@ -704,8 +738,9 @@ fil_node_open_file(
byte* buf2;
byte* page;
ulint space_id;
- ulint flags;
+ ulint flags=0;
ulint page_size;
+ ibool atomic_writes=FALSE;
ut_ad(mutex_own(&(system->mutex)));
ut_a(node->n_pending == 0);
@@ -722,7 +757,7 @@ fil_node_open_file(
node->handle = os_file_create_simple_no_error_handling(
innodb_file_data_key, node->name, OS_FILE_OPEN,
- OS_FILE_READ_ONLY, &success);
+ OS_FILE_READ_ONLY, &success, FALSE);
if (!success) {
/* The following call prints an error message */
os_file_get_last_error(true);
@@ -774,6 +809,8 @@ fil_node_open_file(
space_id = fsp_header_get_space_id(page);
flags = fsp_header_get_flags(page);
page_size = fsp_flags_get_page_size(flags);
+ atomic_writes = fsp_flags_get_atomic_writes(flags);
+
ut_free(buf2);
@@ -824,6 +861,17 @@ fil_node_open_file(
ut_error;
}
+ if (UNIV_UNLIKELY(space->flags != flags)) {
+ if (!dict_tf_verify_flags(space->flags, flags)) {
+ fprintf(stderr,
+ "InnoDB: Error: table flags are 0x%lx"
+ " in the data dictionary\n"
+ "InnoDB: but the flags in file %s are 0x%lx!\n",
+ space->flags, node->name, flags);
+ ut_error;
+ }
+ }
+
if (size_bytes >= 1024 * 1024) {
/* Truncate the size to whole megabytes. */
size_bytes = ut_2pow_round(size_bytes, 1024 * 1024);
@@ -843,6 +891,8 @@ add_size:
space->size += node->size;
}
+ atomic_writes = fsp_flags_get_atomic_writes(space->flags);
+
/* printf("Opening file %s\n", node->name); */
/* Open the file for reading and writing, in Windows normally in the
@@ -853,18 +903,18 @@ add_size:
node->handle = os_file_create(innodb_file_log_key,
node->name, OS_FILE_OPEN,
OS_FILE_AIO, OS_LOG_FILE,
- &ret);
+ &ret, atomic_writes);
} else if (node->is_raw_disk) {
node->handle = os_file_create(innodb_file_data_key,
node->name,
OS_FILE_OPEN_RAW,
OS_FILE_AIO, OS_DATA_FILE,
- &ret);
+ &ret, atomic_writes);
} else {
node->handle = os_file_create(innodb_file_data_key,
node->name, OS_FILE_OPEN,
OS_FILE_AIO, OS_DATA_FILE,
- &ret);
+ &ret, atomic_writes);
}
ut_a(ret);
@@ -1481,6 +1531,21 @@ fil_space_get_space(
if (space->size == 0 && space->purpose == FIL_TABLESPACE) {
ut_a(id != 0);
+ mutex_exit(&fil_system->mutex);
+
+ /* It is possible that the space gets evicted at this point
+ before the fil_mutex_enter_and_prepare_for_io() acquires
+ the fil_system->mutex. Check for this after completing the
+ call to fil_mutex_enter_and_prepare_for_io(). */
+ fil_mutex_enter_and_prepare_for_io(id);
+
+ /* We are still holding the fil_system->mutex. Check if
+ the space is still in memory cache. */
+ space = fil_space_get_by_id(id);
+ if (space == NULL) {
+ return(NULL);
+ }
+
/* The following code must change when InnoDB supports
multiple datafiles per tablespace. */
ut_a(1 == UT_LIST_GET_LEN(space->chain));
@@ -1858,12 +1923,12 @@ fil_write_lsn_and_arch_no_to_file(
buf = static_cast<byte*>(ut_align(buf1, UNIV_PAGE_SIZE));
err = fil_read(TRUE, space, 0, sum_of_sizes, 0,
- UNIV_PAGE_SIZE, buf, NULL);
+ UNIV_PAGE_SIZE, buf, NULL, 0);
if (err == DB_SUCCESS) {
mach_write_to_8(buf + FIL_PAGE_FILE_FLUSH_LSN, lsn);
err = fil_write(TRUE, space, 0, sum_of_sizes, 0,
- UNIV_PAGE_SIZE, buf, NULL);
+ UNIV_PAGE_SIZE, buf, NULL, 0);
}
mem_free(buf1);
@@ -3095,7 +3160,7 @@ fil_create_link_file(
file = os_file_create_simple_no_error_handling(
innodb_file_data_key, link_filepath,
- OS_FILE_CREATE, OS_FILE_READ_WRITE, &success);
+ OS_FILE_CREATE, OS_FILE_READ_WRITE, &success, FALSE);
if (!success) {
/* The following call will print an error message */
@@ -3111,10 +3176,10 @@ fil_create_link_file(
ut_print_filename(stderr, filepath);
fputs(" already exists.\n", stderr);
err = DB_TABLESPACE_EXISTS;
-
} else if (error == OS_FILE_DISK_FULL) {
err = DB_OUT_OF_FILE_SPACE;
-
+ } else if (error == OS_FILE_OPERATION_NOT_SUPPORTED) {
+ err = DB_UNSUPPORTED;
} else {
err = DB_ERROR;
}
@@ -3204,8 +3269,9 @@ fil_open_linked_file(
/*===============*/
const char* tablename, /*!< in: database/tablename */
char** remote_filepath,/*!< out: remote filepath */
- os_file_t* remote_file) /*!< out: remote file handle */
-
+ os_file_t* remote_file, /*!< out: remote file handle */
+ ibool atomic_writes) /*!< in: should atomic writes be
+ used */
{
ibool success;
@@ -3219,7 +3285,7 @@ fil_open_linked_file(
*remote_file = os_file_create_simple_no_error_handling(
innodb_file_data_key, *remote_filepath,
OS_FILE_OPEN, OS_FILE_READ_ONLY,
- &success);
+ &success, atomic_writes);
if (!success) {
char* link_filepath = fil_make_isl_name(tablename);
@@ -3274,6 +3340,7 @@ fil_create_new_single_table_tablespace(
/* TRUE if a table is created with CREATE TEMPORARY TABLE */
bool is_temp = !!(flags2 & DICT_TF2_TEMPORARY);
bool has_data_dir = FSP_FLAGS_HAS_DATA_DIR(flags);
+ bool atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(flags);
ut_a(space_id > 0);
ut_ad(!srv_read_only_mode);
@@ -3306,7 +3373,8 @@ fil_create_new_single_table_tablespace(
OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT,
OS_FILE_NORMAL,
OS_DATA_FILE,
- &ret);
+ &ret,
+ atomic_writes);
if (ret == FALSE) {
/* The following call will print an error message */
@@ -3333,6 +3401,11 @@ fil_create_new_single_table_tablespace(
goto error_exit_3;
}
+ if (error == OS_FILE_OPERATION_NOT_SUPPORTED) {
+ err = DB_UNSUPPORTED;
+ goto error_exit_3;
+ }
+
if (error == OS_FILE_DISK_FULL) {
err = DB_OUT_OF_FILE_SPACE;
goto error_exit_3;
@@ -3371,6 +3444,7 @@ fil_create_new_single_table_tablespace(
flags = fsp_flags_set_page_size(flags, UNIV_PAGE_SIZE);
fsp_header_init_fields(page, space_id, flags);
mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, space_id);
+ ut_ad(fsp_flags_is_valid(flags));
if (!(fsp_flags_is_compressed(flags))) {
buf_flush_init_for_writing(page, NULL, 0);
@@ -3547,6 +3621,7 @@ fil_open_single_table_tablespace(
fsp_open_info remote;
ulint tablespaces_found = 0;
ulint valid_tablespaces_found = 0;
+ ibool atomic_writes = FALSE;
#ifdef UNIV_SYNC_DEBUG
ut_ad(!fix_dict || rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
@@ -3557,6 +3632,8 @@ fil_open_single_table_tablespace(
return(DB_CORRUPTION);
}
+ atomic_writes = fsp_flags_get_atomic_writes(flags);
+
/* If the tablespace was relocated, we do not
compare the DATA_DIR flag */
ulint mod_flags = flags & ~FSP_FLAGS_MASK_DATA_DIR;
@@ -3581,7 +3658,7 @@ fil_open_single_table_tablespace(
}
link_file_found = fil_open_linked_file(
- tablename, &remote.filepath, &remote.file);
+ tablename, &remote.filepath, &remote.file, atomic_writes);
remote.success = link_file_found;
if (remote.success) {
/* possibility of multiple files. */
@@ -3609,7 +3686,7 @@ fil_open_single_table_tablespace(
if (dict.filepath) {
dict.file = os_file_create_simple_no_error_handling(
innodb_file_data_key, dict.filepath, OS_FILE_OPEN,
- OS_FILE_READ_ONLY, &dict.success);
+ OS_FILE_READ_ONLY, &dict.success, atomic_writes);
if (dict.success) {
/* possibility of multiple files. */
validate = true;
@@ -3621,7 +3698,7 @@ fil_open_single_table_tablespace(
ut_a(def.filepath);
def.file = os_file_create_simple_no_error_handling(
innodb_file_data_key, def.filepath, OS_FILE_OPEN,
- OS_FILE_READ_ONLY, &def.success);
+ OS_FILE_READ_ONLY, &def.success, atomic_writes);
if (def.success) {
tablespaces_found++;
}
@@ -4020,7 +4097,7 @@ fil_load_single_table_tablespace(
/* Check for a link file which locates a remote tablespace. */
remote.success = fil_open_linked_file(
- tablename, &remote.filepath, &remote.file);
+ tablename, &remote.filepath, &remote.file, FALSE);
/* Read the first page of the remote tablespace */
if (remote.success) {
@@ -4035,7 +4112,7 @@ fil_load_single_table_tablespace(
/* Try to open the tablespace in the datadir. */
def.file = os_file_create_simple_no_error_handling(
innodb_file_data_key, def.filepath, OS_FILE_OPEN,
- OS_FILE_READ_ONLY, &def.success);
+ OS_FILE_READ_ONLY, &def.success, FALSE);
/* Read the first page of the remote tablespace */
if (def.success) {
@@ -4167,7 +4244,7 @@ will_not_choose:
new_path = fil_make_ibbackup_old_name(fsp->filepath);
bool success = os_file_rename(
- innodb_file_data_key, fsp->filepath, new_path));
+ innodb_file_data_key, fsp->filepath, new_path);
ut_a(success);
@@ -4821,7 +4898,7 @@ retry:
success = os_aio(OS_FILE_WRITE, OS_AIO_SYNC,
node->name, node->handle, buf,
offset, page_size * n_pages,
- NULL, NULL);
+ NULL, NULL, 0);
#endif /* UNIV_HOTBACKUP */
if (success) {
os_has_said_disk_full = FALSE;
@@ -4852,6 +4929,7 @@ retry:
space->size += pages_added;
node->size += pages_added;
+ node->being_extended = FALSE;
#ifdef HAVE_POSIX_FALLOCATE
complete_io:
@@ -4917,7 +4995,7 @@ fil_extend_tablespaces_to_stored_len(void)
single-threaded operation */
error = fil_read(TRUE, space->id,
fsp_flags_get_zip_size(space->flags),
- 0, 0, UNIV_PAGE_SIZE, buf, NULL);
+ 0, 0, UNIV_PAGE_SIZE, buf, NULL, 0);
ut_a(error == DB_SUCCESS);
size_in_header = fsp_get_size_low(buf);
@@ -5191,8 +5269,13 @@ fil_io(
void* buf, /*!< in/out: buffer where to store read data
or from where to write; in aio this must be
appropriately aligned */
- void* message) /*!< in: message for aio handler if non-sync
+ void* message, /*!< in: message for aio handler if non-sync
aio used, else ignored */
+ ulint write_size) /*!< in/out: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
{
ulint mode;
fil_space_t* space;
@@ -5255,6 +5338,9 @@ fil_io(
} else if (type == OS_FILE_WRITE) {
ut_ad(!srv_read_only_mode);
srv_stats.data_written.add(len);
+ if (fil_page_is_index_page((byte *)buf)) {
+ srv_stats.index_pages_written.inc();
+ }
}
/* Reserve the fil_system mutex and make sure that we can open at
@@ -5371,7 +5457,7 @@ fil_io(
#else
/* Queue the aio request */
ret = os_aio(type, mode | wake_later, node->name, node->handle, buf,
- offset, len, node, message);
+ offset, len, node, message, write_size);
#endif /* UNIV_HOTBACKUP */
ut_a(ret);
@@ -5994,7 +6080,7 @@ fil_tablespace_iterate(
file = os_file_create_simple_no_error_handling(
innodb_file_data_key, filepath,
- OS_FILE_OPEN, OS_FILE_READ_WRITE, &success);
+ OS_FILE_OPEN, OS_FILE_READ_WRITE, &success, FALSE);
DBUG_EXECUTE_IF("fil_tablespace_iterate_failure",
{
@@ -6210,3 +6296,32 @@ fil_mtr_rename_log(
mtr_commit(&mtr);
}
+/****************************************************************//**
+Acquire fil_system mutex */
+void
+fil_system_enter(void)
+/*==================*/
+{
+ ut_ad(!mutex_own(&fil_system->mutex));
+ mutex_enter(&fil_system->mutex);
+}
+
+/****************************************************************//**
+Release fil_system mutex */
+void
+fil_system_exit(void)
+/*=================*/
+{
+ ut_ad(mutex_own(&fil_system->mutex));
+ mutex_exit(&fil_system->mutex);
+}
+
+/*******************************************************************//**
+Return space name */
+char*
+fil_space_name(
+/*===========*/
+ fil_space_t* space) /*!< in: space */
+{
+ return (space->name);
+}
diff --git a/storage/innobase/fil/fil0pagecompress.cc b/storage/innobase/fil/fil0pagecompress.cc
new file mode 100644
index 00000000000..3926b23c677
--- /dev/null
+++ b/storage/innobase/fil/fil0pagecompress.cc
@@ -0,0 +1,369 @@
+/*****************************************************************************
+
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file fil/fil0pagecompress.cc
+Implementation for page compressed file spaces.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+#include "fil0fil.h"
+#include "fil0pagecompress.h"
+
+#include <debug_sync.h>
+#include <my_dbug.h>
+
+#include "mem0mem.h"
+#include "hash0hash.h"
+#include "os0file.h"
+#include "mach0data.h"
+#include "buf0buf.h"
+#include "buf0flu.h"
+#include "log0recv.h"
+#include "fsp0fsp.h"
+#include "srv0srv.h"
+#include "srv0start.h"
+#include "mtr0mtr.h"
+#include "mtr0log.h"
+#include "dict0dict.h"
+#include "page0page.h"
+#include "page0zip.h"
+#include "trx0sys.h"
+#include "row0mysql.h"
+#ifndef UNIV_HOTBACKUP
+# include "buf0lru.h"
+# include "ibuf0ibuf.h"
+# include "sync0sync.h"
+# include "os0sync.h"
+#else /* !UNIV_HOTBACKUP */
+# include "srv0srv.h"
+static ulint srv_data_read, srv_data_written;
+#endif /* !UNIV_HOTBACKUP */
+#include "zlib.h"
+#ifdef __linux__
+#include <linux/fs.h>
+#include <sys/ioctl.h>
+#include <fcntl.h>
+#include <linux/falloc.h>
+#endif
+#include "row0mysql.h"
+
+/****************************************************************//**
+For page compressed pages compress the page before actual write
+operation.
+@return compressed page to be written*/
+byte*
+fil_compress_page(
+/*==============*/
+ ulint space_id, /*!< in: tablespace id of the
+ table. */
+ byte* buf, /*!< in: buffer from which to write; in aio
+ this must be appropriately aligned */
+ byte* out_buf, /*!< out: compressed buffer */
+ ulint len, /*!< in: length of input buffer.*/
+ ulint* out_len) /*!< out: actual length of compressed page */
+{
+ int err = Z_OK;
+ int level = 0;
+ ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
+ ulint write_size=0;
+
+ ut_a(buf);
+ ut_a(out_buf);
+ ut_a(len);
+ ut_a(out_len);
+
+ level = fil_space_get_page_compression_level(space_id);
+ ut_a(fil_space_is_page_compressed(space_id));
+
+ fil_system_enter();
+ fil_space_t* space = fil_space_get_by_id(space_id);
+ fil_system_exit();
+
+ /* If no compression level was provided to this table, use system
+ default level */
+ if (level == 0) {
+ level = srv_compress_zlib_level;
+ }
+
+#ifdef UNIV_DEBUG
+ fprintf(stderr,
+ "InnoDB: Note: Preparing for compress for space %lu name %s len %lu\n",
+ space_id, fil_space_name(space), len);
+#endif
+
+ write_size = UNIV_PAGE_SIZE - header_len;
+ err = compress2(out_buf+header_len, &write_size, buf, len, level);
+
+ if (err != Z_OK) {
+ /* If error we leave the actual page as it was */
+
+ fprintf(stderr,
+ "InnoDB: Warning: Compression failed for space %lu name %s len %lu rt %d write %lu\n",
+ space_id, fil_space_name(space), len, err, write_size);
+
+ *out_len = len;
+ return (buf);
+ } else {
+ /* Set up the page header */
+ memcpy(out_buf, buf, FIL_PAGE_DATA);
+ /* Set up the checksum */
+ mach_write_to_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC);
+ /* Set up the correct page type */
+ mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED);
+ /* Set up the flush lsn to be compression algorithm */
+ mach_write_to_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN, FIL_PAGE_COMPRESSION_ZLIB);
+ /* Set up the actual payload lenght */
+ mach_write_to_2(out_buf+FIL_PAGE_DATA, write_size);
+
+#ifdef UNIV_DEBUG
+ /* Verify */
+ ut_ad(fil_page_is_compressed(out_buf));
+ ut_ad(mach_read_from_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM) == BUF_NO_CHECKSUM_MAGIC);
+ ut_ad(mach_read_from_2(out_buf+FIL_PAGE_DATA) == write_size);
+ ut_ad(mach_read_from_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN) == FIL_PAGE_COMPRESSION_ZLIB);
+#endif
+
+ write_size+=header_len;
+ /* Actual write needs to be alligned on block size */
+ if (write_size % OS_FILE_LOG_BLOCK_SIZE) {
+ write_size = (write_size + (OS_FILE_LOG_BLOCK_SIZE - (write_size % OS_FILE_LOG_BLOCK_SIZE)));
+ }
+
+#ifdef UNIV_DEBUG
+ fprintf(stderr,
+ "InnoDB: Note: Compression succeeded for space %lu name %s len %lu out_len %lu\n",
+ space_id, fil_space_name(space), len, write_size);
+#endif
+#define SECT_SIZE 512
+ srv_stats.page_compression_saved.add((len - write_size));
+ if ((len - write_size) > 0) {
+ srv_stats.page_compression_trim_sect512.add(((len - write_size) / SECT_SIZE));
+ srv_stats.page_compression_trim_sect4096.add(((len - write_size) / (SECT_SIZE*8)));
+ }
+ //srv_stats.page_compressed_trim_op.inc();
+ srv_stats.pages_page_compressed.inc();
+ *out_len = write_size;
+
+ return(out_buf);
+ }
+}
+
+/****************************************************************//**
+For page compressed pages decompress the page after actual read
+operation. */
+void
+fil_decompress_page(
+/*================*/
+ byte* page_buf, /*!< in: preallocated buffer or NULL */
+ byte* buf, /*!< out: buffer from which to read; in aio
+ this must be appropriately aligned */
+ ulint len) /*!< in: length of output buffer.*/
+{
+ int err = 0;
+ ulint actual_size = 0;
+ ulint compression_alg = 0;
+ byte *in_buf;
+
+ ut_a(buf);
+ ut_a(len);
+
+ /* Before actual decompress, make sure that page type is correct */
+
+ if (mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM) != BUF_NO_CHECKSUM_MAGIC ||
+ mach_read_from_2(buf+FIL_PAGE_TYPE) != FIL_PAGE_PAGE_COMPRESSED) {
+ fprintf(stderr,
+ "InnoDB: Corruption: We try to uncompress corrupted page\n"
+ "InnoDB: CRC %lu type %lu.\n"
+ "InnoDB: len %lu\n",
+ mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM),
+ mach_read_from_2(buf+FIL_PAGE_TYPE), len);
+
+ fflush(stderr);
+ ut_error;
+ }
+
+ /* Get compression algorithm */
+ compression_alg = mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN);
+
+ if (compression_alg == FIL_PAGE_COMPRESSION_ZLIB) {
+ // If no buffer was given, we need to allocate temporal buffer
+ if (page_buf == NULL) {
+ in_buf = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE));
+ } else {
+ in_buf = page_buf;
+ }
+
+ /* Get the actual size of compressed page */
+ actual_size = mach_read_from_2(buf+FIL_PAGE_DATA);
+
+#ifdef UNIV_DEBUG
+ fprintf(stderr,
+ "InnoDB: Note: Preparing for decompress for len %lu\n",
+ actual_size);
+#endif
+
+ err= uncompress(in_buf, &len, buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE, (unsigned long)actual_size);
+
+
+ /* If uncompress fails it means that page is corrupted */
+ if (err != Z_OK) {
+
+ fprintf(stderr,
+ "InnoDB: Corruption: Page is marked as compressed\n"
+ "InnoDB: but uncompress failed with error %d.\n"
+ "InnoDB: size %lu len %lu\n",
+ err, actual_size, len);
+
+ fflush(stderr);
+
+ ut_error;
+ }
+
+#ifdef UNIV_DEBUG
+ fprintf(stderr,
+ "InnoDB: Note: Decompression succeeded for len %lu \n",
+ len);
+#endif
+
+ /* Copy the uncompressed page to the buffer pool, not
+ really any other options. */
+ memcpy(buf, in_buf, len);
+
+ // Need to free temporal buffer if no buffer was given
+ if (page_buf == NULL) {
+ ut_free(in_buf);
+ }
+
+ srv_stats.pages_page_decompressed.inc();
+ } else {
+ fprintf(stderr,
+ "InnoDB: Corruption: Page is marked as compressed\n"
+ "InnoDB: but compression algorithm %s\n"
+ "InnoDB: is not known.\n"
+ ,fil_get_compression_alg_name(compression_alg));
+
+ fflush(stderr);
+ ut_error;
+ }
+}
+
+/*******************************************************************//**
+Find out wheather the page is index page or not
+@return true if page type index page, false if not */
+ibool
+fil_page_is_index_page(
+/*===================*/
+ byte *buf) /*!< in: page */
+{
+ return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_INDEX);
+}
+
+/*******************************************************************//**
+Find out wheather the page is page compressed
+@return true if page is page compressed, false if not */
+ibool
+fil_page_is_compressed(
+/*===================*/
+ byte *buf) /*!< in: page */
+{
+ return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED);
+}
+
+/*******************************************************************//**
+Returns the page compression level of the space, or 0 if the space
+is not compressed. The tablespace must be cached in the memory cache.
+@return page compression level, ULINT_UNDEFINED if space not found */
+ulint
+fil_space_get_page_compression_level(
+/*=================================*/
+ ulint id) /*!< in: space id */
+{
+ ulint flags;
+
+ flags = fil_space_get_flags(id);
+
+ if (flags && flags != ULINT_UNDEFINED) {
+
+ return(fsp_flags_get_page_compression_level(flags));
+ }
+
+ return(flags);
+}
+
+/*******************************************************************//**
+Extract the page compression from space.
+@return true if space is page compressed, false if space is not found
+or space is not page compressed. */
+ibool
+fil_space_is_page_compressed(
+/*=========================*/
+ ulint id) /*!< in: space id */
+{
+ ulint flags;
+
+ flags = fil_space_get_flags(id);
+
+ if (flags && flags != ULINT_UNDEFINED) {
+
+ return(fsp_flags_is_page_compressed(flags));
+ }
+
+ return(flags);
+}
+
+/****************************************************************//**
+Get the name of the compression algorithm used for page
+compression.
+@return compression algorithm name or "UNKNOWN" if not known*/
+const char*
+fil_get_compression_alg_name(
+/*=========================*/
+ ulint comp_alg) /*!<in: compression algorithm number */
+{
+ switch(comp_alg) {
+ case FIL_PAGE_COMPRESSION_ZLIB:
+ return ("ZLIB");
+ break;
+ default:
+ return("UNKNOWN");
+ break;
+ }
+}
+
+/*******************************************************************//**
+Returns the atomic writes flag of the space, or false if the space
+is not using atomic writes. The tablespace must be cached in the memory cache.
+@return true if space using atomic writes, false if not */
+ibool
+fil_space_get_atomic_writes(
+/*========================*/
+ ulint id) /*!< in: space id */
+{
+ ulint flags;
+
+ flags = fil_space_get_flags(id);
+
+ if (flags && flags != ULINT_UNDEFINED) {
+
+ return(fsp_flags_get_atomic_writes(flags));
+ }
+
+ return(flags);
+}
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 82421d2d725..085521ac7e5 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -4,6 +4,7 @@ Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, 2009 Google Inc.
Copyright (c) 2009, Percona Inc.
Copyright (c) 2012, Facebook Inc.
+Copyright (c) 2013, SkySQL Ab.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -485,6 +486,28 @@ ib_cb_t innodb_api_cb[] = {
(ib_cb_t) ib_cfg_bk_commit_interval
};
+/**
+ Structure for CREATE TABLE options (table options).
+ It needs to be called ha_table_option_struct.
+
+ The option values can be specified in the CREATE TABLE at the end:
+ CREATE TABLE ( ... ) *here*
+*/
+
+ha_create_table_option innodb_table_option_list[]=
+{
+ /* With this option user can enable page compression feature for the
+ table */
+ HA_TOPTION_BOOL("PAGE_COMPRESSED", page_compressed, 0),
+ /* With this option user can set zip compression level for page
+ compression for this table*/
+ HA_TOPTION_NUMBER("PAGE_COMPRESSION_LEVEL", page_compression_level, ULINT_UNDEFINED, 0, 9, 1),
+ /* With this option user can enable atomic writes feature for this table */
+ HA_TOPTION_BOOL("ATOMIC_WRITES", atomic_writes, 0),
+ HA_TOPTION_END
+};
+
+
/*************************************************************//**
Check whether valid argument given to innodb_ft_*_stopword_table.
This function is registered as a callback with MySQL.
@@ -647,6 +670,24 @@ static SHOW_VAR innodb_status_variables[]= {
{"purge_view_trx_id_age",
(char*) &export_vars.innodb_purge_view_trx_id_age, SHOW_LONG},
#endif /* UNIV_DEBUG */
+ /* Status variables for page compression */
+ {"page_compression_saved",
+ (char*) &export_vars.innodb_page_compression_saved, SHOW_LONGLONG},
+ {"page_compression_trim_sect512",
+ (char*) &export_vars.innodb_page_compression_trim_sect512, SHOW_LONGLONG},
+ {"page_compression_trim_sect4096",
+ (char*) &export_vars.innodb_page_compression_trim_sect4096, SHOW_LONGLONG},
+ {"num_index_pages_written",
+ (char*) &export_vars.innodb_index_pages_written, SHOW_LONGLONG},
+ {"num_pages_page_compressed",
+ (char*) &export_vars.innodb_pages_page_compressed, SHOW_LONGLONG},
+ {"num_page_compressed_trim_op",
+ (char*) &export_vars.innodb_page_compressed_trim_op, SHOW_LONGLONG},
+ {"num_page_compressed_trim_op_saved",
+ (char*) &export_vars.innodb_page_compressed_trim_op_saved, SHOW_LONGLONG},
+ {"num_pages_page_decompressed",
+ (char*) &export_vars.innodb_pages_page_decompressed, SHOW_LONGLONG},
+
{NullS, NullS, SHOW_LONG}
};
@@ -2796,6 +2837,8 @@ innobase_init(
if (srv_file_per_table)
innobase_hton->tablefile_extensions = ha_innobase_exts;
+ innobase_hton->table_options = innodb_table_option_list;
+
ut_a(DATA_MYSQL_TRUE_VARCHAR == (ulint)MYSQL_TYPE_VARCHAR);
#ifndef DBUG_OFF
@@ -3118,8 +3161,6 @@ innobase_change_buffering_inited_ok:
srv_use_doublewrite_buf = (ibool) innobase_use_doublewrite;
- page_compression_level = (ulint) innobase_compression_level;
-
if (!innobase_use_checksums) {
ut_print_timestamp(stderr);
fprintf(stderr,
@@ -9465,11 +9506,16 @@ innobase_table_flags(
enum row_type row_format;
rec_format_t innodb_row_format = REC_FORMAT_COMPACT;
bool use_data_dir;
+ ha_table_option_struct *options= form->s->option_struct;
/* Cache the value of innodb_file_format, in case it is
modified by another thread while the table is being created. */
const ulint file_format_allowed = srv_file_format;
+ /* Cache the value of innobase_compression_level, in case it is
+ modified by another thread while the table is being created. */
+ const ulint default_compression_level = innobase_compression_level;
+
*flags = 0;
*flags2 = 0;
@@ -9513,6 +9559,8 @@ index_bad:
}
}
+ row_format = form->s->row_type;
+
if (create_info->key_block_size) {
/* The requested compressed page size (key_block_size)
is given in kilobytes. If it is a valid number, store
@@ -9522,7 +9570,7 @@ index_bad:
ulint kbsize; /* Key Block Size */
for (zssize = kbsize = 1;
zssize <= ut_min(UNIV_PAGE_SSIZE_MAX,
- PAGE_ZIP_SSIZE_MAX);
+ PAGE_ZIP_SSIZE_MAX);
zssize++, kbsize <<= 1) {
if (kbsize == create_info->key_block_size) {
zip_ssize = zssize;
@@ -9550,8 +9598,8 @@ index_bad:
}
if (!zip_allowed
- || zssize > ut_min(UNIV_PAGE_SSIZE_MAX,
- PAGE_ZIP_SSIZE_MAX)) {
+ || zssize > ut_min(UNIV_PAGE_SSIZE_MAX,
+ PAGE_ZIP_SSIZE_MAX)) {
push_warning_printf(
thd, Sql_condition::WARN_LEVEL_WARN,
ER_ILLEGAL_HA_CREATE_OPTION,
@@ -9560,8 +9608,6 @@ index_bad:
}
}
- row_format = form->s->row_type;
-
if (zip_ssize && zip_allowed) {
/* if ROW_FORMAT is set to default,
automatically change it to COMPRESSED.*/
@@ -9598,7 +9644,6 @@ index_bad:
case ROW_TYPE_REDUNDANT:
innodb_row_format = REC_FORMAT_REDUNDANT;
break;
-
case ROW_TYPE_COMPRESSED:
case ROW_TYPE_DYNAMIC:
if (!use_tablespace) {
@@ -9616,10 +9661,18 @@ index_bad:
" innodb_file_format > Antelope.",
get_row_format_name(row_format));
} else {
- innodb_row_format = (row_format == ROW_TYPE_DYNAMIC
- ? REC_FORMAT_DYNAMIC
- : REC_FORMAT_COMPRESSED);
- break;
+ switch(row_format) {
+ case ROW_TYPE_COMPRESSED:
+ innodb_row_format = REC_FORMAT_COMPRESSED;
+ break;
+ case ROW_TYPE_DYNAMIC:
+ innodb_row_format = REC_FORMAT_DYNAMIC;
+ break;
+ default:
+ /* Not possible, avoid compiler warning */
+ break;
+ }
+ break; /* Correct row_format */
}
zip_allowed = FALSE;
/* fall through to set row_format = COMPACT */
@@ -9646,7 +9699,15 @@ index_bad:
&& ((create_info->data_file_name != NULL)
&& !(create_info->options & HA_LEX_CREATE_TMP_TABLE));
- dict_tf_set(flags, innodb_row_format, zip_ssize, use_data_dir);
+ /* Set up table dictionary flags */
+ dict_tf_set(flags,
+ innodb_row_format,
+ zip_ssize,
+ use_data_dir,
+ options->page_compressed,
+ (ulint)options->page_compression_level == ULINT_UNDEFINED ?
+ default_compression_level : options->page_compression_level,
+ options->atomic_writes);
if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
*flags2 |= DICT_TF2_TEMPORARY;
@@ -9659,6 +9720,111 @@ index_bad:
DBUG_RETURN(true);
}
+
+/*****************************************************************//**
+Check engine specific table options not handled by SQL-parser.
+@return NULL if valid, string if not */
+UNIV_INTERN
+const char*
+ha_innobase::check_table_options(
+ THD *thd, /*!< in: thread handle */
+ TABLE* table, /*!< in: information on table
+ columns and indexes */
+ HA_CREATE_INFO* create_info, /*!< in: more information of the
+ created table, contains also the
+ create statement string */
+ const bool use_tablespace, /*!< in: use file par table */
+ const ulint file_format)
+{
+ enum row_type row_format = table->s->row_type;;
+ ha_table_option_struct *options= table->s->option_struct;
+
+ /* Check page compression requirements */
+ if (options->page_compressed) {
+ if (!srv_compress_pages) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: PAGE_COMPRESSED requires"
+ "innodb_compress_pages not enabled");
+ return "PAGE_COMPRESSED";
+ }
+
+ if (row_format == ROW_TYPE_COMPRESSED) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: PAGE_COMPRESSED table can't have"
+ " ROW_TYPE=COMPRESSED");
+ return "PAGE_COMPRESSED";
+ }
+
+ if (!use_tablespace) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: PAGE_COMPRESSED requires"
+ " innodb_file_per_table.");
+ return "PAGE_COMPRESSED";
+ }
+
+ if (file_format < UNIV_FORMAT_B) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: PAGE_COMPRESSED requires"
+ " innodb_file_format > Antelope.");
+ return "PAGE_COMPRESSED";
+ }
+
+ if (create_info->key_block_size) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: PAGE_COMPRESSED table can't have"
+ " key_block_size");
+ return "PAGE_COMPRESSED";
+ }
+ }
+
+ /* Check page compression level requirements, some of them are
+ already checked above */
+ if ((ulint)options->page_compression_level != ULINT_UNDEFINED) {
+ if (options->page_compressed == false) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: PAGE_COMPRESSION_LEVEL requires"
+ " PAGE_COMPRESSED");
+ return "PAGE_COMPRESSION_LEVEL";
+ }
+
+ if (options->page_compression_level < 0 || options->page_compression_level > 9) {
+ push_warning_printf(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: invalid PAGE_COMPRESSION_LEVEL = %lu."
+ " Valid values are [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]",
+ create_info->key_block_size);
+ return "PAGE_COMPRESSION_LEVEL";
+ }
+ }
+
+ /* Check atomic writes requirements */
+ if (options->atomic_writes) {
+ if (!srv_use_atomic_writes && !use_tablespace) {
+ push_warning(
+ thd, Sql_condition::WARN_LEVEL_WARN,
+ HA_WRONG_CREATE_OPTION,
+ "InnoDB: ATOMIC_WRITES requires"
+ " innodb_file_per_table.");
+ return "ATOMIC_WRITES";
+ }
+ }
+
+ return 0;
+}
+
/*****************************************************************//**
Creates a new table to an InnoDB database.
@return error number */
@@ -9690,6 +9856,7 @@ ha_innobase::create(
while creating the table. So we read the current value here
and make all further decisions based on this. */
bool use_tablespace = srv_file_per_table;
+ const ulint file_format = srv_file_format;
/* Zip Shift Size - log2 - 9 of compressed page size,
zero for uncompressed */
@@ -9713,6 +9880,12 @@ ha_innobase::create(
/* Create the table definition in InnoDB */
+ /* Validate table options not handled by the SQL-parser */
+ if(check_table_options(thd, form, create_info, use_tablespace,
+ file_format)) {
+ DBUG_RETURN(HA_WRONG_CREATE_OPTION);
+ }
+
/* Validate create options if innodb_strict_mode is set. */
if (create_options_are_invalid(
thd, form, create_info, use_tablespace)) {
@@ -13952,6 +14125,12 @@ ha_innobase::check_if_incompatible_data(
HA_CREATE_INFO* info,
uint table_changes)
{
+ ha_table_option_struct *param_old, *param_new;
+
+ /* Cache engine specific options */
+ param_new = info->option_struct;
+ param_old = table->s->option_struct;
+
innobase_copy_frm_flags_from_create_info(prebuilt->table, info);
if (table_changes != IS_EQUAL_YES) {
@@ -13978,6 +14157,13 @@ ha_innobase::check_if_incompatible_data(
return(COMPATIBLE_DATA_NO);
}
+ /* Changes on engine specific table options requests a rebuild of the table. */
+ if (param_new->page_compressed != param_old->page_compressed ||
+ param_new->page_compression_level != param_old->page_compression_level ||
+ param_new->atomic_writes != param_old->atomic_writes) {
+ return(COMPATIBLE_DATA_NO);
+ }
+
return(COMPATIBLE_DATA_YES);
}
@@ -16447,6 +16633,31 @@ static MYSQL_SYSVAR_BOOL(trx_purge_view_update_only_debug,
NULL, NULL, FALSE);
#endif /* UNIV_DEBUG */
+static MYSQL_SYSVAR_BOOL(compress_pages, srv_compress_pages,
+ PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
+ "Use page compression.",
+ NULL, NULL, FALSE);
+
+static MYSQL_SYSVAR_LONG(trim_pct, srv_trim_pct,
+ PLUGIN_VAR_OPCMDARG ,
+ "How many percent of compressed pages should be trimmed",
+ NULL, NULL, 100, 0, 100, 0);
+
+static MYSQL_SYSVAR_LONG(compress_zlib_level, srv_compress_zlib_level,
+ PLUGIN_VAR_OPCMDARG ,
+ "Default zlib compression level",
+ NULL, NULL, 6, 0, 9, 0);
+
+static MYSQL_SYSVAR_BOOL(compress_index_pages, srv_page_compress_index_pages,
+ PLUGIN_VAR_OPCMDARG,
+ "Use page compression for only index pages.",
+ NULL, NULL, FALSE);
+
+static MYSQL_SYSVAR_BOOL(use_trim, srv_use_trim,
+ PLUGIN_VAR_OPCMDARG,
+ "Use trim.",
+ NULL, NULL, TRUE);
+
static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(additional_mem_pool_size),
MYSQL_SYSVAR(api_trx_level),
@@ -16592,6 +16803,11 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(limit_optimistic_insert_debug),
MYSQL_SYSVAR(trx_purge_view_update_only_debug),
#endif /* UNIV_DEBUG */
+ MYSQL_SYSVAR(compress_pages),
+ MYSQL_SYSVAR(trim_pct),
+ MYSQL_SYSVAR(compress_zlib_level),
+ MYSQL_SYSVAR(compress_index_pages),
+ MYSQL_SYSVAR(use_trim),
NULL
};
diff --git a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h
index ece9f7cf58a..5eb460072bb 100644
--- a/storage/innobase/handler/ha_innodb.h
+++ b/storage/innobase/handler/ha_innodb.h
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 2000, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -56,6 +57,18 @@ typedef struct st_innobase_share {
/** Prebuilt structures in an InnoDB table handle used within MySQL */
struct row_prebuilt_t;
+/** Engine specific table options are definined using this struct */
+struct ha_table_option_struct
+{
+ bool page_compressed; /*!< Table is using page compression
+ if this option is true. */
+ int page_compression_level; /*!< Table page compression level
+ or UNIV_UNSPECIFIED. */
+ bool atomic_writes; /*!< Use atomic writes for this
+ table if this options is true. */
+};
+
+
/** The class defining a handle to an Innodb table */
class ha_innobase: public handler
{
@@ -182,6 +195,8 @@ class ha_innobase: public handler
char* norm_name,
char* temp_path,
char* remote_path);
+ const char* check_table_options(THD *thd, TABLE* table,
+ HA_CREATE_INFO* create_info, const bool use_tablespace, const ulint file_format);
int create(const char *name, register TABLE *form,
HA_CREATE_INFO *create_info);
int truncate();
diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc
index a120534b36d..49f8a05d11a 100644
--- a/storage/innobase/handler/handler0alter.cc
+++ b/storage/innobase/handler/handler0alter.cc
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 2005, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -248,6 +249,22 @@ ha_innobase::check_if_supported_inplace_alter(
update_thd();
trx_search_latch_release_if_reserved(prebuilt->trx);
+ /* Change on engine specific table options require rebuild of the
+ table */
+ if (ha_alter_info->handler_flags
+ == Alter_inplace_info::CHANGE_CREATE_OPTION) {
+ ha_table_option_struct *new_options= ha_alter_info->create_info->option_struct;
+ ha_table_option_struct *old_options= table->s->option_struct;
+
+ if (new_options->page_compressed != old_options->page_compressed ||
+ new_options->page_compression_level != old_options->page_compression_level ||
+ new_options->atomic_writes != old_options->page_compression_level) {
+ ha_alter_info->unsupported_reason = innobase_get_err_msg(
+ ER_ALTER_OPERATION_NOT_SUPPORTED_REASON);
+ DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
+ }
+ }
+
if (ha_alter_info->handler_flags
& ~(INNOBASE_ONLINE_OPERATIONS | INNOBASE_INPLACE_REBUILD)) {
if (ha_alter_info->handler_flags
@@ -3331,6 +3348,17 @@ ha_innobase::prepare_inplace_alter_table(
if (ha_alter_info->handler_flags
& Alter_inplace_info::CHANGE_CREATE_OPTION) {
+ /* Check engine specific table options */
+ if (const char* invalid_tbopt = check_table_options(
+ user_thd, altered_table,
+ ha_alter_info->create_info,
+ prebuilt->table->space != 0,
+ srv_file_format)) {
+ my_error(ER_ILLEGAL_HA_CREATE_OPTION, MYF(0),
+ table_type(), invalid_tbopt);
+ goto err_exit_no_heap;
+ }
+
if (const char* invalid_opt = create_options_are_invalid(
user_thd, altered_table,
ha_alter_info->create_info,
diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h
index 74a6e203808..5e301a27e32 100644
--- a/storage/innobase/include/buf0buf.h
+++ b/storage/innobase/include/buf0buf.h
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -1470,6 +1471,11 @@ struct buf_page_t{
state == BUF_BLOCK_ZIP_PAGE and
zip.data == NULL means an active
buf_pool->watch */
+
+ ulint write_size; /* Write size is set when this
+ page is first time written and then
+ if written again we check is TRIM
+ operation needed. */
#ifndef UNIV_HOTBACKUP
buf_page_t* hash; /*!< node used in chaining to
buf_pool->page_hash or
diff --git a/storage/innobase/include/dict0dict.h b/storage/innobase/include/dict0dict.h
index af0a5b31cc4..0ca64956a2e 100644
--- a/storage/innobase/include/dict0dict.h
+++ b/storage/innobase/include/dict0dict.h
@@ -2,6 +2,7 @@
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -42,6 +43,8 @@ Created 1/8/1996 Heikki Tuuri
#include "ut0byte.h"
#include "trx0types.h"
#include "row0types.h"
+#include "fsp0fsp.h"
+#include "dict0pagecompress.h"
#ifndef UNIV_HOTBACKUP
# include "sync0sync.h"
@@ -878,7 +881,14 @@ dict_tf_set(
ulint* flags, /*!< in/out: table */
rec_format_t format, /*!< in: file format */
ulint zip_ssize, /*!< in: zip shift size */
- bool remote_path) /*!< in: table uses DATA DIRECTORY */
+ bool remote_path, /*!< in: table uses DATA DIRECTORY
+ */
+ bool page_compressed,/*!< in: table uses page compressed
+ pages */
+ ulint page_compression_level, /*!< in: table page compression
+ level */
+ bool atomic_writes) /*!< in: table uses atomic
+ writes */
__attribute__((nonnull));
/********************************************************************//**
Convert a 32 bit integer table flags to the 32 bit integer that is
@@ -906,6 +916,7 @@ dict_tf_get_zip_size(
/*=================*/
ulint flags) /*!< in: flags */
__attribute__((const));
+
/********************************************************************//**
Check whether the table uses the compressed compact page format.
@return compressed page size, or 0 if not compressed */
@@ -1779,6 +1790,7 @@ dict_tf_to_row_format_string(
#endif /* !UNIV_HOTBACKUP */
+
#ifndef UNIV_NONINL
#include "dict0dict.ic"
#endif
diff --git a/storage/innobase/include/dict0dict.ic b/storage/innobase/include/dict0dict.ic
index 83953c9325a..65967552b87 100644
--- a/storage/innobase/include/dict0dict.ic
+++ b/storage/innobase/include/dict0dict.ic
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -537,9 +538,25 @@ dict_tf_is_valid(
ulint zip_ssize = DICT_TF_GET_ZIP_SSIZE(flags);
ulint atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(flags);
ulint unused = DICT_TF_GET_UNUSED(flags);
+ ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(flags);
+ ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(flags);
+ ulint data_dir = DICT_TF_HAS_DATA_DIR(flags);
+ ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(flags);
/* Make sure there are no bits that we do not know about. */
if (unused != 0) {
+ fprintf(stderr,
+ "InnoDB: Error: table unused flags are %ld"
+ " in the data dictionary and are corrupted\n"
+ "InnoDB: Error: data dictionary flags are\n"
+ "InnoDB: compact %ld atomic_blobs %ld\n"
+ "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n"
+ "InnoDB: page_compression %ld page_compression_level %ld\n"
+ "InnoDB: atomic_writes %ld\n",
+ unused,
+ compact, atomic_blobs, unused, data_dir, zip_ssize,
+ page_compression, page_compression_level, atomic_writes
+ );
return(false);
@@ -550,12 +567,34 @@ dict_tf_is_valid(
data stored off-page in the clustered index. */
if (!compact) {
+ fprintf(stderr,
+ "InnoDB: Error: table compact flags are %ld"
+ " in the data dictionary and are corrupted\n"
+ "InnoDB: Error: data dictionary flags are\n"
+ "InnoDB: compact %ld atomic_blobs %ld\n"
+ "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n"
+ "InnoDB: page_compression %ld page_compression_level %ld\n"
+ "InnoDB: atomic_writes %ld\n",
+ compact, compact, atomic_blobs, unused, data_dir, zip_ssize,
+ page_compression, page_compression_level, atomic_writes
+ );
return(false);
}
} else if (zip_ssize) {
/* Antelope does not support COMPRESSED row format. */
+ fprintf(stderr,
+ "InnoDB: Error: table flags are %ld"
+ " in the data dictionary and are corrupted\n"
+ "InnoDB: Error: data dictionary flags are\n"
+ "InnoDB: compact %ld atomic_blobs %ld\n"
+ "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n"
+ "InnoDB: page_compression %ld page_compression_level %ld\n"
+ "InnoDB: atomic_writes %ld\n",
+ flags, compact, atomic_blobs, unused, data_dir, zip_ssize,
+ page_compression, page_compression_level, atomic_writes
+ );
return(false);
}
@@ -568,6 +607,40 @@ dict_tf_is_valid(
|| !atomic_blobs
|| zip_ssize > PAGE_ZIP_SSIZE_MAX) {
+ fprintf(stderr,
+ "InnoDB: Error: table compact flags are %ld in the data dictionary and are corrupted\n"
+ "InnoDB: Error: data dictionary flags are\n"
+ "InnoDB: compact %ld atomic_blobs %ld\n"
+ "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n"
+ "InnoDB: page_compression %ld page_compression_level %ld\n"
+ "InnoDB: atomic_writes %ld\n",
+ flags,
+ compact, atomic_blobs, unused, data_dir, zip_ssize,
+ page_compression, page_compression_level, atomic_writes
+
+ );
+ return(false);
+ }
+ }
+
+ if (page_compression || page_compression_level) {
+ /* Page compression format must have compact and
+ atomic_blobs and page_compression_level requires
+ page_compression */
+ if (!compact
+ || !page_compression
+ || !atomic_blobs) {
+
+ fprintf(stderr,
+ "InnoDB: Error: table flags are %ld in the data dictionary and are corrupted\n"
+ "InnoDB: Error: data dictionary flags are\n"
+ "InnoDB: compact %ld atomic_blobs %ld\n"
+ "InnoDB: unused %ld data_dir %ld zip_ssize %ld\n"
+ "InnoDB: page_compression %ld page_compression_level %ld\n"
+ "InnoDB: atomic_writes %ld\n",
+ flags, compact, atomic_blobs, unused, data_dir, zip_ssize,
+ page_compression, page_compression_level, atomic_writes
+ );
return(false);
}
}
@@ -594,6 +667,9 @@ dict_sys_tables_type_validate(
ulint zip_ssize = DICT_TF_GET_ZIP_SSIZE(type);
ulint atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(type);
ulint unused = DICT_TF_GET_UNUSED(type);
+ ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(type);
+ ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(type);
+ ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(type);
/* The low order bit of SYS_TABLES.TYPE is always set to 1.
If the format is UNIV_FORMAT_B or higher, this field is the same
@@ -647,6 +723,23 @@ dict_sys_tables_type_validate(
format, so the DATA_DIR flag is compatible with any other
table flags. However, it is not used with TEMPORARY tables.*/
+ if (page_compression || page_compression_level) {
+ /* page compressed row format must have low_order_bit and
+ atomic_blobs bits set and the DICT_N_COLS_COMPACT flag
+ should be in N_COLS, but we already know about the
+ low_order_bit and DICT_N_COLS_COMPACT flags. */
+
+ if (!atomic_blobs || !page_compression) {
+ return(ULINT_UNDEFINED);
+ }
+ }
+
+ if (atomic_writes) {
+ if (!atomic_blobs) {
+ return(ULINT_UNDEFINED);
+ }
+ }
+
/* Return the validated SYS_TABLES.TYPE. */
return(type);
}
@@ -719,7 +812,14 @@ dict_tf_set(
ulint* flags, /*!< in/out: table flags */
rec_format_t format, /*!< in: file format */
ulint zip_ssize, /*!< in: zip shift size */
- bool use_data_dir) /*!< in: table uses DATA DIRECTORY */
+ bool use_data_dir, /*!< in: table uses DATA DIRECTORY
+ */
+ bool page_compressed,/*!< in: table uses page compressed
+ pages */
+ ulint page_compression_level, /*!< in: table page compression
+ level */
+ bool atomic_writes) /*!< in: table uses atomic
+ writes */
{
switch (format) {
case REC_FORMAT_REDUNDANT:
@@ -742,6 +842,22 @@ dict_tf_set(
break;
}
+ if (page_compressed) {
+ *flags = DICT_TF_COMPACT
+ | (1 << DICT_TF_POS_ATOMIC_BLOBS)
+ | (1 << DICT_TF_POS_PAGE_COMPRESSION)
+ | (page_compression_level << DICT_TF_POS_PAGE_COMPRESSION_LEVEL);
+
+ ut_ad(zip_ssize == 0);
+ ut_ad(dict_tf_get_page_compression(*flags) == TRUE);
+ ut_ad(dict_tf_get_page_compression_level(*flags) == page_compression_level);
+ }
+
+ if (atomic_writes) {
+ *flags |= (1 << DICT_TF_POS_ATOMIC_WRITES);
+ ut_ad(dict_tf_get_atomic_writes(*flags) == TRUE);
+ }
+
if (use_data_dir) {
*flags |= (1 << DICT_TF_POS_DATA_DIR);
}
@@ -765,6 +881,9 @@ dict_tf_to_fsp_flags(
ulint table_flags) /*!< in: dict_table_t::flags */
{
ulint fsp_flags;
+ ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(table_flags);
+ ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(table_flags);
+ ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(table_flags);
DBUG_EXECUTE_IF("dict_tf_to_fsp_flags_failure",
return(ULINT_UNDEFINED););
@@ -783,7 +902,20 @@ dict_tf_to_fsp_flags(
fsp_flags |= DICT_TF_HAS_DATA_DIR(table_flags)
? FSP_FLAGS_MASK_DATA_DIR : 0;
+ /* In addition, tablespace flags also contain if the page
+ compression is used for this table. */
+ fsp_flags |= FSP_FLAGS_SET_PAGE_COMPRESSION(fsp_flags, page_compression);
+
+ /* In addition, tablespace flags also contain page compression level
+ if page compression is used for this table. */
+ fsp_flags |= FSP_FLAGS_SET_PAGE_COMPRESSION_LEVEL(fsp_flags, page_compression_level);
+
+ /* In addition, tablespace flags also contain flag if atomic writes
+ is used for this table */
+ fsp_flags |= FSP_FLAGS_SET_ATOMIC_WRITES(fsp_flags, atomic_writes);
+
ut_a(fsp_flags_is_valid(fsp_flags));
+ ut_a(dict_tf_verify_flags(table_flags, fsp_flags));
return(fsp_flags);
}
@@ -811,10 +943,15 @@ dict_sys_tables_type_to_tf(
/* Adjust bit zero. */
flags = redundant ? 0 : 1;
- /* ZIP_SSIZE, ATOMIC_BLOBS & DATA_DIR are the same. */
+ /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION,
+ PAGE_COMPRESSION_LEVEL, ATOMIC_WRITES are the same. */
flags |= type & (DICT_TF_MASK_ZIP_SSIZE
| DICT_TF_MASK_ATOMIC_BLOBS
- | DICT_TF_MASK_DATA_DIR);
+ | DICT_TF_MASK_DATA_DIR
+ | DICT_TF_MASK_PAGE_COMPRESSION
+ | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL
+ | DICT_TF_MASK_ATOMIC_WRITES
+ );
return(flags);
}
@@ -842,10 +979,14 @@ dict_tf_to_sys_tables_type(
/* Adjust bit zero. It is always 1 in SYS_TABLES.TYPE */
type = 1;
- /* ZIP_SSIZE, ATOMIC_BLOBS & DATA_DIR are the same. */
+ /* ZIP_SSIZE, ATOMIC_BLOBS, DATA_DIR, PAGE_COMPRESSION,
+ PAGE_COMPRESSION_LEVEL, ATOMIC_WRITES are the same. */
type |= flags & (DICT_TF_MASK_ZIP_SSIZE
| DICT_TF_MASK_ATOMIC_BLOBS
- | DICT_TF_MASK_DATA_DIR);
+ | DICT_TF_MASK_DATA_DIR
+ | DICT_TF_MASK_PAGE_COMPRESSION
+ | DICT_TF_MASK_PAGE_COMPRESSION_LEVEL
+ | DICT_TF_MASK_ATOMIC_WRITES);
return(type);
}
diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h
index 671f67eb1f8..6cfcb81bcd5 100644
--- a/storage/innobase/include/dict0mem.h
+++ b/storage/innobase/include/dict0mem.h
@@ -2,6 +2,7 @@
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -120,11 +121,25 @@ This flag prevents older engines from attempting to open the table and
allows InnoDB to update_create_info() accordingly. */
#define DICT_TF_WIDTH_DATA_DIR 1
+/**
+Width of the page compression flag
+*/
+#define DICT_TF_WIDTH_PAGE_COMPRESSION 1
+#define DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL 4
+
+/**
+Width of atomic writes flag
+*/
+#define DICT_TF_WIDTH_ATOMIC_WRITES 1
+
/** Width of all the currently known table flags */
#define DICT_TF_BITS (DICT_TF_WIDTH_COMPACT \
+ DICT_TF_WIDTH_ZIP_SSIZE \
+ DICT_TF_WIDTH_ATOMIC_BLOBS \
- + DICT_TF_WIDTH_DATA_DIR)
+ + DICT_TF_WIDTH_DATA_DIR \
+ + DICT_TF_WIDTH_PAGE_COMPRESSION \
+ + DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL \
+ + DICT_TF_WIDTH_ATOMIC_WRITES)
/** A mask of all the known/used bits in table flags */
#define DICT_TF_BIT_MASK (~(~0 << DICT_TF_BITS))
@@ -140,9 +155,19 @@ allows InnoDB to update_create_info() accordingly. */
/** Zero relative shift position of the DATA_DIR field */
#define DICT_TF_POS_DATA_DIR (DICT_TF_POS_ATOMIC_BLOBS \
+ DICT_TF_WIDTH_ATOMIC_BLOBS)
+/** Zero relative shift position of the PAGE_COMPRESSION field */
+#define DICT_TF_POS_PAGE_COMPRESSION (DICT_TF_POS_DATA_DIR \
+ + DICT_TF_WIDTH_DATA_DIR)
+/** Zero relative shift position of the PAGE_COMPRESSION_LEVEL field */
+#define DICT_TF_POS_PAGE_COMPRESSION_LEVEL (DICT_TF_POS_PAGE_COMPRESSION \
+ + DICT_TF_WIDTH_PAGE_COMPRESSION)
+/** Zero relative shift position of the ATOMIC_WRITES field */
+#define DICT_TF_POS_ATOMIC_WRITES (DICT_TF_POS_PAGE_COMPRESSION_LEVEL \
+ + DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL)
+
/** Zero relative shift position of the start of the UNUSED bits */
-#define DICT_TF_POS_UNUSED (DICT_TF_POS_DATA_DIR \
- + DICT_TF_WIDTH_DATA_DIR)
+#define DICT_TF_POS_UNUSED (DICT_TF_POS_ATOMIC_WRITES \
+ + DICT_TF_WIDTH_ATOMIC_WRITES)
/** Bit mask of the COMPACT field */
#define DICT_TF_MASK_COMPACT \
@@ -160,6 +185,18 @@ allows InnoDB to update_create_info() accordingly. */
#define DICT_TF_MASK_DATA_DIR \
((~(~0 << DICT_TF_WIDTH_DATA_DIR)) \
<< DICT_TF_POS_DATA_DIR)
+/** Bit mask of the PAGE_COMPRESSION field */
+#define DICT_TF_MASK_PAGE_COMPRESSION \
+ ((~(~0 << DICT_TF_WIDTH_PAGE_COMPRESSION)) \
+ << DICT_TF_POS_PAGE_COMPRESSION)
+/** Bit mask of the PAGE_COMPRESSION_LEVEL field */
+#define DICT_TF_MASK_PAGE_COMPRESSION_LEVEL \
+ ((~(~0 << DICT_TF_WIDTH_PAGE_COMPRESSION_LEVEL)) \
+ << DICT_TF_POS_PAGE_COMPRESSION_LEVEL)
+/** Bit mask of the ATOMIC_WRITES field */
+#define DICT_TF_MASK_ATOMIC_WRITES \
+ ((~(~0 << DICT_TF_WIDTH_ATOMIC_WRITES)) \
+ << DICT_TF_POS_ATOMIC_WRITES)
/** Return the value of the COMPACT field */
#define DICT_TF_GET_COMPACT(flags) \
@@ -177,6 +214,19 @@ allows InnoDB to update_create_info() accordingly. */
#define DICT_TF_HAS_DATA_DIR(flags) \
((flags & DICT_TF_MASK_DATA_DIR) \
>> DICT_TF_POS_DATA_DIR)
+/** Return the value of the PAGE_COMPRESSION field */
+#define DICT_TF_GET_PAGE_COMPRESSION(flags) \
+ ((flags & DICT_TF_MASK_PAGE_COMPRESSION) \
+ >> DICT_TF_POS_PAGE_COMPRESSION)
+/** Return the value of the PAGE_COMPRESSION_LEVEL field */
+#define DICT_TF_GET_PAGE_COMPRESSION_LEVEL(flags) \
+ ((flags & DICT_TF_MASK_PAGE_COMPRESSION_LEVEL) \
+ >> DICT_TF_POS_PAGE_COMPRESSION_LEVEL)
+/** Return the value of the ATOMIC_WRITES field */
+#define DICT_TF_GET_ATOMIC_WRITES(flags) \
+ ((flags & DICT_TF_MASK_ATOMIC_WRITES) \
+ >> DICT_TF_POS_ATOMIC_WRITES)
+
/** Return the contents of the UNUSED bits */
#define DICT_TF_GET_UNUSED(flags) \
(flags >> DICT_TF_POS_UNUSED)
diff --git a/storage/innobase/include/dict0pagecompress.h b/storage/innobase/include/dict0pagecompress.h
new file mode 100644
index 00000000000..236924758f1
--- /dev/null
+++ b/storage/innobase/include/dict0pagecompress.h
@@ -0,0 +1,94 @@
+/*****************************************************************************
+
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file include/dict0pagecompress.h
+Helper functions for extracting/storing page compression information
+to dictionary.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+#ifndef dict0pagecompress_h
+#define dict0pagecompress_h
+
+/********************************************************************//**
+Extract the page compression level from table flags.
+@return page compression level, or 0 if not compressed */
+UNIV_INLINE
+ulint
+dict_tf_get_page_compression_level(
+/*===============================*/
+ ulint flags) /*!< in: flags */
+ __attribute__((const));
+/********************************************************************//**
+Extract the page compression flag from table flags
+@return page compression flag, or false if not compressed */
+UNIV_INLINE
+ibool
+dict_tf_get_page_compression(
+/*==========================*/
+ ulint flags) /*!< in: flags */
+ __attribute__((const));
+
+/********************************************************************//**
+Check whether the table uses the page compressed page format.
+@return page compression level, or 0 if not compressed */
+UNIV_INLINE
+ulint
+dict_table_page_compression_level(
+/*==============================*/
+ const dict_table_t* table) /*!< in: table */
+ __attribute__((const));
+
+/********************************************************************//**
+Verify that dictionary flags match tablespace flags
+@return true if flags match, false if not */
+UNIV_INLINE
+ibool
+dict_tf_verify_flags(
+/*=================*/
+ ulint table_flags, /*!< in: dict_table_t::flags */
+ ulint fsp_flags) /*!< in: fil_space_t::flags */
+ __attribute__((const));
+
+/********************************************************************//**
+Extract the atomic writes flag from table flags.
+@return true if atomic writes are used, false if not used */
+UNIV_INLINE
+ibool
+dict_tf_get_atomic_writes(
+/*======================*/
+ ulint flags) /*!< in: flags */
+ __attribute__((const));
+
+/********************************************************************//**
+Check whether the table uses the atomic writes.
+@return true if atomic writes is used, false if not */
+UNIV_INLINE
+ibool
+dict_table_get_atomic_writes(
+/*=========================*/
+ const dict_table_t* table); /*!< in: table */
+
+
+#ifndef UNIV_NONINL
+#include "dict0pagecompress.ic"
+#endif
+
+#endif
diff --git a/storage/innobase/include/dict0pagecompress.ic b/storage/innobase/include/dict0pagecompress.ic
new file mode 100644
index 00000000000..98b64723542
--- /dev/null
+++ b/storage/innobase/include/dict0pagecompress.ic
@@ -0,0 +1,191 @@
+/*****************************************************************************
+
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file include/dict0pagecompress.ic
+Inline implementation for helper functions for extracting/storing
+page compression and atomic writes information to dictionary.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+/********************************************************************//**
+Verify that dictionary flags match tablespace flags
+@return true if flags match, false if not */
+UNIV_INLINE
+ibool
+dict_tf_verify_flags(
+/*=================*/
+ ulint table_flags, /*!< in: dict_table_t::flags */
+ ulint fsp_flags) /*!< in: fil_space_t::flags */
+{
+ ulint table_unused = DICT_TF_GET_UNUSED(table_flags);
+ ulint compact = DICT_TF_GET_COMPACT(table_flags);
+ ulint ssize = DICT_TF_GET_ZIP_SSIZE(table_flags);
+ ulint atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(table_flags);
+ ulint data_dir = DICT_TF_HAS_DATA_DIR(table_flags);
+ ulint page_compression = DICT_TF_GET_PAGE_COMPRESSION(table_flags);
+ ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(table_flags);
+ ulint atomic_writes = DICT_TF_GET_ATOMIC_WRITES(table_flags);
+ ulint post_antelope = FSP_FLAGS_GET_POST_ANTELOPE(fsp_flags);
+ ulint zip_ssize = FSP_FLAGS_GET_ZIP_SSIZE(fsp_flags);
+ ulint fsp_atomic_blobs = FSP_FLAGS_HAS_ATOMIC_BLOBS(fsp_flags);
+ ulint page_ssize = FSP_FLAGS_GET_PAGE_SSIZE(fsp_flags);
+ ulint fsp_unused = FSP_FLAGS_GET_UNUSED(fsp_flags);
+ ulint fsp_page_compression = FSP_FLAGS_GET_PAGE_COMPRESSION(fsp_flags);
+ ulint fsp_page_compression_level = FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(fsp_flags);
+ ulint fsp_atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(fsp_flags);
+
+ DBUG_EXECUTE_IF("dict_tf_verify_flags_failure",
+ return(ULINT_UNDEFINED););
+
+ ut_ad(!table_unused);
+ ut_ad(!fsp_unused);
+ ut_ad(page_ssize == 0 || page_ssize != 0); /* silence compiler */
+ ut_ad(compact == 0 || compact == 1); /* silence compiler */
+ ut_ad(data_dir == 0 || data_dir == 1); /* silence compiler */
+ ut_ad(post_antelope == 0 || post_antelope == 1); /* silence compiler */
+
+ if (ssize != zip_ssize) {
+ fprintf(stderr,
+ "InnoDB: Error: table flags has zip_ssize %ld"
+ " in the data dictionary\n"
+ "InnoDB: but the flags in file has zip_ssize %ld\n",
+ ssize, zip_ssize);
+ return (FALSE);
+ }
+ if (atomic_blobs != fsp_atomic_blobs) {
+ fprintf(stderr,
+ "InnoDB: Error: table flags has atomic_blobs %ld"
+ " in the data dictionary\n"
+ "InnoDB: but the flags in file has atomic_blobs %ld\n",
+ atomic_blobs, fsp_atomic_blobs);
+
+ return (FALSE);
+ }
+ if (page_compression != fsp_page_compression) {
+ fprintf(stderr,
+ "InnoDB: Error: table flags has page_compression %ld"
+ " in the data dictionary\n"
+ "InnoDB: but the flags in file ahas page_compression %ld\n",
+ page_compression, fsp_page_compression);
+
+ return (FALSE);
+ }
+ if (page_compression_level != fsp_page_compression_level) {
+ fprintf(stderr,
+ "InnoDB: Error: table flags has page_compression_level %ld"
+ " in the data dictionary\n"
+ "InnoDB: but the flags in file has page_compression_level %ld\n",
+ page_compression_level, fsp_page_compression_level);
+
+ return (FALSE);
+ }
+
+ if (atomic_writes != fsp_atomic_writes) {
+ fprintf(stderr,
+ "InnoDB: Error: table flags has atomic writes %ld"
+ " in the data dictionary\n"
+ "InnoDB: but the flags in file has atomic_writes %ld\n",
+ atomic_writes, fsp_atomic_writes);
+
+ return (FALSE);
+ }
+
+ return(TRUE);
+}
+
+/********************************************************************//**
+Extract the page compression level from dict_table_t::flags.
+These flags are in memory, so assert that they are valid.
+@return page compression level, or 0 if not compressed */
+UNIV_INLINE
+ulint
+dict_tf_get_page_compression_level(
+/*===============================*/
+ ulint flags) /*!< in: flags */
+{
+ ulint page_compression_level = DICT_TF_GET_PAGE_COMPRESSION_LEVEL(flags);
+
+ ut_ad(page_compression_level >= 0 && page_compression_level <= 9);
+
+ return(page_compression_level);
+}
+
+/********************************************************************//**
+Check whether the table uses the page compression page format.
+@return page compression level, or 0 if not compressed */
+UNIV_INLINE
+ulint
+dict_table_page_compression_level(
+/*==============================*/
+ const dict_table_t* table) /*!< in: table */
+{
+ ut_ad(table);
+ ut_ad(dict_tf_get_page_compression(table->flags));
+
+ return(dict_tf_get_page_compression_level(table->flags));
+}
+
+/********************************************************************//**
+Check whether the table uses the page compression page format.
+@return true if page compressed, false if not */
+UNIV_INLINE
+ibool
+dict_tf_get_page_compression(
+/*=========================*/
+ ulint flags) /*!< in: flags */
+{
+ return(DICT_TF_GET_PAGE_COMPRESSION(flags));
+}
+
+/********************************************************************//**
+Check whether the table uses the page compression page format.
+@return true if page compressed, false if not */
+UNIV_INLINE
+ibool
+dict_table_is_page_compressed(
+/*==========================*/
+ const dict_table_t* table) /*!< in: table */
+{
+ return (dict_tf_get_page_compression(table->flags));
+}
+
+/********************************************************************//**
+Extract the atomic writes flag from table flags.
+@return true if atomic writes are used, false if not used */
+UNIV_INLINE
+ibool
+dict_tf_get_atomic_writes(
+/*======================*/
+ ulint flags) /*!< in: flags */
+{
+ return(DICT_TF_GET_ATOMIC_WRITES(flags));
+}
+
+/********************************************************************//**
+Check whether the table uses the atomic writes.
+@return true if atomic writes is used, false if not */
+UNIV_INLINE
+ibool
+dict_table_get_atomic_writes(
+/*=========================*/
+ const dict_table_t* table) /*!< in: table */
+{
+ return (dict_tf_get_atomic_writes(table->flags));
+}
diff --git a/storage/innobase/include/fil0fil.h b/storage/innobase/include/fil0fil.h
index 56fda8b39b1..c5edd33f46b 100644
--- a/storage/innobase/include/fil0fil.h
+++ b/storage/innobase/include/fil0fil.h
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -128,6 +129,12 @@ extern fil_addr_t fil_addr_null;
#define FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID 34 /*!< starting from 4.1.x this
contains the space id of the page */
#define FIL_PAGE_DATA 38 /*!< start of the data on the page */
+/* Following are used when page compression is used */
+#define FIL_PAGE_COMPRESSED_SIZE 2 /*!< Number of bytes used to store
+ actual payload data size on
+ compressed pages. */
+#define FIL_PAGE_COMPRESSION_ZLIB 1 /*!< Compressin algorithm ZLIB. */
+
/* @} */
/** File page trailer @{ */
#define FIL_PAGE_END_LSN_OLD_CHKSUM 8 /*!< the low 4 bytes of this are used
@@ -140,6 +147,7 @@ extern fil_addr_t fil_addr_null;
#ifndef UNIV_INNOCHECKSUM
/** File page types (values of FIL_PAGE_TYPE) @{ */
+#define FIL_PAGE_PAGE_COMPRESSED 34354 /*!< page compressed page */
#define FIL_PAGE_INDEX 17855 /*!< B-tree node */
#define FIL_PAGE_UNDO_LOG 2 /*!< Undo log page */
#define FIL_PAGE_INODE 3 /*!< Index node */
@@ -202,6 +210,7 @@ ulint
fil_space_get_type(
/*===============*/
ulint id); /*!< in: space id */
+
#endif /* !UNIV_HOTBACKUP */
/*******************************************************************//**
Appends a new file to the chain of files of a space. File must be closed.
@@ -742,8 +751,13 @@ fil_io(
void* buf, /*!< in/out: buffer where to store read data
or from where to write; in aio this must be
appropriately aligned */
- void* message) /*!< in: message for aio handler if non-sync
+ void* message, /*!< in: message for aio handler if non-sync
aio used, else ignored */
+ ulint write_size) /*!< in/out: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
__attribute__((nonnull(8)));
/**********************************************************************//**
Waits for an aio operation to complete. This function is used to write the
@@ -977,8 +991,33 @@ fil_mtr_rename_log(
ulint new_space_id, /*!< in: tablespace id of the new
table */
const char* new_name, /*!< in: new table name */
- const char* tmp_name); /*!< in: temp table name used while
+ const char* tmp_name) /*!< in: temp table name used while
swapping */
+ __attribute__((nonnull));
#endif /* !UNIV_INNOCHECKSUM */
+
+/****************************************************************//**
+Acquire fil_system mutex */
+void
+fil_system_enter(void);
+/*==================*/
+/****************************************************************//**
+Release fil_system mutex */
+void
+fil_system_exit(void);
+/*==================*/
+/*******************************************************************//**
+Returns the table space by a given id, NULL if not found. */
+fil_space_t*
+fil_space_get_by_id(
+/*================*/
+ ulint id); /*!< in: space id */
+/*******************************************************************//**
+Return space name */
+char*
+fil_space_name(
+/*===========*/
+ fil_space_t* space); /*!< in: space */
+
#endif /* fil0fil_h */
diff --git a/storage/innobase/include/fil0pagecompress.h b/storage/innobase/include/fil0pagecompress.h
new file mode 100644
index 00000000000..e21eae7a5ee
--- /dev/null
+++ b/storage/innobase/include/fil0pagecompress.h
@@ -0,0 +1,117 @@
+/*****************************************************************************
+
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+#ifndef fil0pagecompress_h
+#define fil0pagecompress_h
+
+#include "fsp0fsp.h"
+#include "fsp0pagecompress.h"
+
+/******************************************************************//**
+@file include/fil0pagecompress.h
+Helper functions for extracting/storing page compression and
+atomic writes information to table space.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+/*******************************************************************//**
+Returns the page compression level flag of the space, or 0 if the space
+is not compressed. The tablespace must be cached in the memory cache.
+@return page compression level if page compressed, ULINT_UNDEFINED if space not found */
+ulint
+fil_space_get_page_compression_level(
+/*=================================*/
+ ulint id); /*!< in: space id */
+/*******************************************************************//**
+Returns the page compression flag of the space, or false if the space
+is not compressed. The tablespace must be cached in the memory cache.
+@return true if page compressed, false if not or space not found */
+ibool
+fil_space_is_page_compressed(
+/*=========================*/
+ ulint id); /*!< in: space id */
+/*******************************************************************//**
+Returns the atomic writes flag of the space, or false if the space
+is not using atomic writes. The tablespace must be cached in the memory cache.
+@return true if space using atomic writes, false if not */
+ibool
+fil_space_get_atomic_writes(
+/*=========================*/
+ ulint id); /*!< in: space id */
+/*******************************************************************//**
+Find out wheather the page is index page or not
+@return true if page type index page, false if not */
+ibool
+fil_page_is_index_page(
+/*===================*/
+ byte *buf); /*!< in: page */
+
+/****************************************************************//**
+Get the name of the compression algorithm used for page
+compression.
+@return compression algorithm name or "UNKNOWN" if not known*/
+const char*
+fil_get_compression_alg_name(
+/*=========================*/
+ ulint comp_alg); /*!<in: compression algorithm number */
+
+/****************************************************************//**
+For page compressed pages compress the page before actual write
+operation.
+@return compressed page to be written*/
+byte*
+fil_compress_page(
+/*==============*/
+ ulint space_id, /*!< in: tablespace id of the
+ table. */
+ byte* buf, /*!< in: buffer from which to write; in aio
+ this must be appropriately aligned */
+ byte* out_buf, /*!< out: compressed buffer */
+ ulint len, /*!< in: length of input buffer.*/
+ ulint* out_len); /*!< out: actual length of compressed page */
+
+/****************************************************************//**
+For page compressed pages decompress the page after actual read
+operation.
+@return uncompressed page */
+void
+fil_decompress_page(
+/*================*/
+ byte* page_buf, /*!< in: preallocated buffer or NULL */
+ byte* buf, /*!< out: buffer from which to read; in aio
+ this must be appropriately aligned */
+ ulint len); /*!< in: length of output buffer.*/
+
+/****************************************************************//**
+Get space id from fil node
+@return space id*/
+ulint
+fil_node_get_space_id(
+/*==================*/
+ fil_node_t* node); /*!< in: Node where to get space id*/
+
+/*******************************************************************//**
+Find out wheather the page is page compressed
+@return true if page is page compressed*/
+ibool
+fil_page_is_compressed(
+/*===================*/
+ byte *buf); /*!< in: page */
+
+#endif
diff --git a/storage/innobase/include/fsp0fsp.h b/storage/innobase/include/fsp0fsp.h
index a587ccc9f20..31c34cdafca 100644
--- a/storage/innobase/include/fsp0fsp.h
+++ b/storage/innobase/include/fsp0fsp.h
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -53,12 +54,21 @@ to the two Barracuda row formats COMPRESSED and DYNAMIC. */
/** Width of the DATA_DIR flag. This flag indicates that the tablespace
is found in a remote location, not the default data directory. */
#define FSP_FLAGS_WIDTH_DATA_DIR 1
+/** Number of flag bits used to indicate the page compression and compression level */
+#define FSP_FLAGS_WIDTH_PAGE_COMPRESSION 1
+#define FSP_FLAGS_WIDTH_PAGE_COMPRESSION_LEVEL 4
+/** Number of flag bits used to indicate atomic writes for this tablespace */
+#define FSP_FLAGS_WIDTH_ATOMIC_WRITES 1
+
/** Width of all the currently known tablespace flags */
#define FSP_FLAGS_WIDTH (FSP_FLAGS_WIDTH_POST_ANTELOPE \
+ FSP_FLAGS_WIDTH_ZIP_SSIZE \
+ FSP_FLAGS_WIDTH_ATOMIC_BLOBS \
+ FSP_FLAGS_WIDTH_PAGE_SSIZE \
- + FSP_FLAGS_WIDTH_DATA_DIR)
+ + FSP_FLAGS_WIDTH_DATA_DIR \
+ + FSP_FLAGS_WIDTH_PAGE_COMPRESSION \
+ + FSP_FLAGS_WIDTH_PAGE_COMPRESSION_LEVEL \
+ + FSP_FLAGS_WIDTH_ATOMIC_WRITES)
/** A mask of all the known/used bits in tablespace flags */
#define FSP_FLAGS_MASK (~(~0 << FSP_FLAGS_WIDTH))
@@ -71,9 +81,20 @@ is found in a remote location, not the default data directory. */
/** Zero relative shift position of the ATOMIC_BLOBS field */
#define FSP_FLAGS_POS_ATOMIC_BLOBS (FSP_FLAGS_POS_ZIP_SSIZE \
+ FSP_FLAGS_WIDTH_ZIP_SSIZE)
-/** Zero relative shift position of the PAGE_SSIZE field */
-#define FSP_FLAGS_POS_PAGE_SSIZE (FSP_FLAGS_POS_ATOMIC_BLOBS \
+/** Note that these need to be before the page size to be compatible with
+dictionary */
+/** Zero relative shift position of the PAGE_COMPRESSION field */
+#define FSP_FLAGS_POS_PAGE_COMPRESSION (FSP_FLAGS_POS_ATOMIC_BLOBS \
+ FSP_FLAGS_WIDTH_ATOMIC_BLOBS)
+/** Zero relative shift position of the PAGE_COMPRESSION_LEVEL field */
+#define FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL (FSP_FLAGS_POS_PAGE_COMPRESSION \
+ + FSP_FLAGS_WIDTH_PAGE_COMPRESSION)
+/** Zero relative shift position of the ATOMIC_WRITES field */
+#define FSP_FLAGS_POS_ATOMIC_WRITES (FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL \
+ + FSP_FLAGS_WIDTH_PAGE_COMPRESSION_LEVEL)
+ /** Zero relative shift position of the PAGE_SSIZE field */
+#define FSP_FLAGS_POS_PAGE_SSIZE (FSP_FLAGS_POS_ATOMIC_WRITES \
+ + FSP_FLAGS_WIDTH_ATOMIC_WRITES)
/** Zero relative shift position of the start of the UNUSED bits */
#define FSP_FLAGS_POS_DATA_DIR (FSP_FLAGS_POS_PAGE_SSIZE \
+ FSP_FLAGS_WIDTH_PAGE_SSIZE)
@@ -101,6 +122,18 @@ is found in a remote location, not the default data directory. */
#define FSP_FLAGS_MASK_DATA_DIR \
((~(~0 << FSP_FLAGS_WIDTH_DATA_DIR)) \
<< FSP_FLAGS_POS_DATA_DIR)
+/** Bit mask of the PAGE_COMPRESSION field */
+#define FSP_FLAGS_MASK_PAGE_COMPRESSION \
+ ((~(~0 << FSP_FLAGS_WIDTH_PAGE_COMPRESSION)) \
+ << FSP_FLAGS_POS_PAGE_COMPRESSION)
+/** Bit mask of the PAGE_COMPRESSION_LEVEL field */
+#define FSP_FLAGS_MASK_PAGE_COMPRESSION_LEVEL \
+ ((~(~0 << FSP_FLAGS_WIDTH_PAGE_COMPRESSION_LEVEL)) \
+ << FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL)
+/** Bit mask of the ATOMIC_WRITES field */
+#define FSP_FLAGS_MASK_ATOMIC_WRITES \
+ ((~(~0 << FSP_FLAGS_WIDTH_ATOMIC_WRITES)) \
+ << FSP_FLAGS_POS_ATOMIC_WRITES)
/** Return the value of the POST_ANTELOPE field */
#define FSP_FLAGS_GET_POST_ANTELOPE(flags) \
@@ -126,11 +159,38 @@ is found in a remote location, not the default data directory. */
#define FSP_FLAGS_GET_UNUSED(flags) \
(flags >> FSP_FLAGS_POS_UNUSED)
+/** Return the value of the PAGE_COMPRESSION field */
+#define FSP_FLAGS_GET_PAGE_COMPRESSION(flags) \
+ ((flags & FSP_FLAGS_MASK_PAGE_COMPRESSION) \
+ >> FSP_FLAGS_POS_PAGE_COMPRESSION)
+/** Return the value of the PAGE_COMPRESSION_LEVEL field */
+#define FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags) \
+ ((flags & FSP_FLAGS_MASK_PAGE_COMPRESSION_LEVEL) \
+ >> FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL)
+/** Return the value of the ATOMIC_WRITES field */
+#define FSP_FLAGS_GET_ATOMIC_WRITES(flags) \
+ ((flags & FSP_FLAGS_MASK_ATOMIC_WRITES) \
+ >> FSP_FLAGS_POS_ATOMIC_WRITES)
+
/** Set a PAGE_SSIZE into the correct bits in a given
tablespace flags. */
#define FSP_FLAGS_SET_PAGE_SSIZE(flags, ssize) \
(flags | (ssize << FSP_FLAGS_POS_PAGE_SSIZE))
+/** Set a PAGE_COMPRESSION into the correct bits in a given
+tablespace flags. */
+#define FSP_FLAGS_SET_PAGE_COMPRESSION(flags, compression) \
+ (flags | (compression << FSP_FLAGS_POS_PAGE_COMPRESSION))
+
+/** Set a PAGE_COMPRESSION_LEVEL into the correct bits in a given
+tablespace flags. */
+#define FSP_FLAGS_SET_PAGE_COMPRESSION_LEVEL(flags, level) \
+ (flags | (level << FSP_FLAGS_POS_PAGE_COMPRESSION_LEVEL))
+/** Set a ATOMIC_WRITES into the correct bits in a given
+tablespace flags. */
+#define FSP_FLAGS_SET_ATOMIC_WRITES(flags, atomics) \
+ (flags | (atomics << FSP_FLAGS_POS_ATOMIC_WRITES))
+
/* @} */
/* @defgroup Tablespace Header Constants (moved from fsp0fsp.c) @{ */
diff --git a/storage/innobase/include/fsp0fsp.ic b/storage/innobase/include/fsp0fsp.ic
index 0d81e817cc9..0ca02a5652d 100644
--- a/storage/innobase/include/fsp0fsp.ic
+++ b/storage/innobase/include/fsp0fsp.ic
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -63,6 +64,9 @@ fsp_flags_is_valid(
ulint atomic_blobs = FSP_FLAGS_HAS_ATOMIC_BLOBS(flags);
ulint page_ssize = FSP_FLAGS_GET_PAGE_SSIZE(flags);
ulint unused = FSP_FLAGS_GET_UNUSED(flags);
+ ulint page_compression = FSP_FLAGS_GET_PAGE_COMPRESSION(flags);
+ ulint page_compression_level = FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags);
+ ulint atomic_writes = FSP_FLAGS_GET_ATOMIC_WRITES(flags);
DBUG_EXECUTE_IF("fsp_flags_is_valid_failure", return(false););
@@ -104,6 +108,18 @@ fsp_flags_is_valid(
return(false);
}
+ /* Page compression level requires page compression and atomic blobs
+ to be set */
+ if (page_compression_level || page_compression) {
+ if (!page_compression || !atomic_blobs) {
+ return(false);
+ }
+ }
+
+ if (atomic_writes && !atomic_blobs) {
+ return (false);
+ }
+
#if UNIV_FORMAT_MAX != UNIV_FORMAT_B
# error "UNIV_FORMAT_MAX != UNIV_FORMAT_B, Add more validations."
#endif
@@ -312,3 +328,4 @@ xdes_calc_descriptor_page(
}
#endif /* !UNIV_INNOCHECKSUM */
+
diff --git a/storage/innobase/include/fsp0pagecompress.h b/storage/innobase/include/fsp0pagecompress.h
new file mode 100644
index 00000000000..417d4a6879e
--- /dev/null
+++ b/storage/innobase/include/fsp0pagecompress.h
@@ -0,0 +1,64 @@
+/*****************************************************************************
+
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file include/fsp0pagecompress.h
+Helper functions for extracting/storing page compression and
+atomic writes information to file space.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+#ifndef fsp0pagecompress_h
+#define fsp0pagecompress_h
+
+/**********************************************************************//**
+Reads the page compression level from the first page of a tablespace.
+@return page compression level, or 0 if uncompressed */
+UNIV_INTERN
+ulint
+fsp_header_get_compression_level(
+/*=============================*/
+ const page_t* page); /*!< in: first page of a tablespace */
+
+/********************************************************************//**
+Determine if the tablespace is page compressed from dict_table_t::flags.
+@return TRUE if page compressed, FALSE if not compressed */
+UNIV_INLINE
+ibool
+fsp_flags_is_page_compressed(
+/*=========================*/
+ ulint flags); /*!< in: tablespace flags */
+
+/********************************************************************//**
+Extract the page compression level from tablespace flags.
+A tablespace has only one physical page compression level
+whether that page is compressed or not.
+@return page compression level of the file-per-table tablespace,
+or zero if the table is not compressed. */
+UNIV_INLINE
+ulint
+fsp_flags_get_page_compression_level(
+/*=================================*/
+ ulint flags); /*!< in: tablespace flags */
+
+#ifndef UNIV_NONINL
+#include "fsp0pagecompress.ic"
+#endif
+
+#endif
diff --git a/storage/innobase/include/fsp0pagecompress.ic b/storage/innobase/include/fsp0pagecompress.ic
new file mode 100644
index 00000000000..1dffd1bedf1
--- /dev/null
+++ b/storage/innobase/include/fsp0pagecompress.ic
@@ -0,0 +1,61 @@
+/*****************************************************************************
+
+Copyright (C) 2013 SkySQL Ab. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+*****************************************************************************/
+
+/******************************************************************//**
+@file include/fsp0pagecompress.ic
+Implementation for helper functions for extracting/storing page
+compression and atomic writes information to file space.
+
+Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
+***********************************************************************/
+
+/********************************************************************//**
+Determine if the tablespace is page compressed from dict_table_t::flags.
+@return TRUE if page compressed, FALSE if not page compressed */
+UNIV_INLINE
+ibool
+fsp_flags_is_page_compressed(
+/*=========================*/
+ ulint flags) /*!< in: tablespace flags */
+{
+ return(FSP_FLAGS_GET_PAGE_COMPRESSION(flags));
+}
+
+/********************************************************************//**
+Determine the tablespace is page compression level from dict_table_t::flags.
+@return page compression level or 0 if not compressed*/
+UNIV_INLINE
+ulint
+fsp_flags_get_page_compression_level(
+/*=================================*/
+ ulint flags) /*!< in: tablespace flags */
+{
+ return(FSP_FLAGS_GET_PAGE_COMPRESSION_LEVEL(flags));
+}
+
+/********************************************************************//**
+Determine the tablespace is using atomic writes from dict_table_t::flags.
+@return true if atomic writes is used, false if not */
+UNIV_INLINE
+ibool
+fsp_flags_get_atomic_writes(
+/*========================*/
+ ulint flags) /*!< in: tablespace flags */
+{
+ return(FSP_FLAGS_GET_ATOMIC_WRITES(flags));
+}
diff --git a/storage/innobase/include/fsp0types.h b/storage/innobase/include/fsp0types.h
index 94fd908ab0c..e5c1734b842 100644
--- a/storage/innobase/include/fsp0types.h
+++ b/storage/innobase/include/fsp0types.h
@@ -29,6 +29,7 @@ Created May 26, 2009 Vasil Dimov
#include "univ.i"
#include "fil0fil.h" /* for FIL_PAGE_DATA */
+#include "ut0byte.h"
/** @name Flags for inserting records in order
If records are inserted in order, there are the following
diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h
index 4a744c1b268..3c70f9925fe 100644
--- a/storage/innobase/include/os0file.h
+++ b/storage/innobase/include/os0file.h
@@ -2,6 +2,7 @@
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2009, Percona Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted
by Percona Inc.. Those modifications are
@@ -150,6 +151,7 @@ enum os_file_create_t {
#define OS_FILE_INSUFFICIENT_RESOURCE 78
#define OS_FILE_AIO_INTERRUPTED 79
#define OS_FILE_OPERATION_ABORTED 80
+#define OS_FILE_OPERATION_NOT_SUPPORTED 125
/* @} */
/** Types for aio operations @{ */
@@ -269,26 +271,26 @@ os_file_write
The wrapper functions have the prefix of "innodb_". */
#ifdef UNIV_PFS_IO
-# define os_file_create(key, name, create, purpose, type, success) \
+# define os_file_create(key, name, create, purpose, type, success, atomic_writes) \
pfs_os_file_create_func(key, name, create, purpose, type, \
- success, __FILE__, __LINE__)
+ success, atomic_writes, __FILE__, __LINE__)
-# define os_file_create_simple(key, name, create, access, success) \
+# define os_file_create_simple(key, name, create, access, success, atomic_writes) \
pfs_os_file_create_simple_func(key, name, create, access, \
- success, __FILE__, __LINE__)
+ success, atomic_writes, __FILE__, __LINE__)
# define os_file_create_simple_no_error_handling( \
- key, name, create_mode, access, success) \
+ key, name, create_mode, access, success, atomic_writes) \
pfs_os_file_create_simple_no_error_handling_func( \
- key, name, create_mode, access, success, __FILE__, __LINE__)
+ key, name, create_mode, access, success, atomic_writes, __FILE__, __LINE__)
# define os_file_close(file) \
pfs_os_file_close_func(file, __FILE__, __LINE__)
# define os_aio(type, mode, name, file, buf, offset, \
- n, message1, message2) \
+ n, message1, message2, write_size) \
pfs_os_aio_func(type, mode, name, file, buf, offset, \
- n, message1, message2, __FILE__, __LINE__)
+ n, message1, message2, write_size, __FILE__, __LINE__)
# define os_file_read(file, buf, offset, n) \
pfs_os_file_read_func(file, buf, offset, n, __FILE__, __LINE__)
@@ -310,22 +312,22 @@ The wrapper functions have the prefix of "innodb_". */
/* If UNIV_PFS_IO is not defined, these I/O APIs point
to original un-instrumented file I/O APIs */
-# define os_file_create(key, name, create, purpose, type, success) \
- os_file_create_func(name, create, purpose, type, success)
+# define os_file_create(key, name, create, purpose, type, success, atomic_writes) \
+ os_file_create_func(name, create, purpose, type, success, atomic_writes)
-# define os_file_create_simple(key, name, create_mode, access, success) \
- os_file_create_simple_func(name, create_mode, access, success)
+# define os_file_create_simple(key, name, create_mode, access, success, atomic_writes) \
+ os_file_create_simple_func(name, create_mode, access, success, atomic_writes)
# define os_file_create_simple_no_error_handling( \
- key, name, create_mode, access, success) \
- os_file_create_simple_no_error_handling_func( \
- name, create_mode, access, success)
+ key, name, create_mode, access, success, atomic_writes) \
+ os_file_create_simple_no_error_handling_func( \
+ name, create_mode, access, success, atomic_writes)
# define os_file_close(file) os_file_close_func(file)
-# define os_aio(type, mode, name, file, buf, offset, n, message1, message2) \
+# define os_aio(type, mode, name, file, buf, offset, n, message1, message2, write_size) \
os_aio_func(type, mode, name, file, buf, offset, n, \
- message1, message2)
+ message1, message2, write_size)
# define os_file_read(file, buf, offset, n) \
os_file_read_func(file, buf, offset, n)
@@ -468,7 +470,8 @@ os_file_create_simple_func(
ulint create_mode,/*!< in: create mode */
ulint access_type,/*!< in: OS_FILE_READ_ONLY or
OS_FILE_READ_WRITE */
- ibool* success);/*!< out: TRUE if succeed, FALSE if error */
+ ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes); /*!<in TRUE if atomic writes are used */
/****************************************************************//**
NOTE! Use the corresponding macro
os_file_create_simple_no_error_handling(), not directly this function!
@@ -486,7 +489,8 @@ os_file_create_simple_no_error_handling_func(
OS_FILE_READ_WRITE, or
OS_FILE_READ_ALLOW_DELETE; the last option is
used by a backup program reading the file */
- ibool* success)/*!< out: TRUE if succeed, FALSE if error */
+ ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes)/*!<in TRUE if atomic writes are used */
__attribute__((nonnull, warn_unused_result));
/****************************************************************//**
Tries to disable OS caching on an opened file descriptor. */
@@ -520,7 +524,8 @@ os_file_create_func(
async i/o or unbuffered i/o: look in the
function source code for the exact rules */
ulint type, /*!< in: OS_DATA_FILE or OS_LOG_FILE */
- ibool* success)/*!< out: TRUE if succeed, FALSE if error */
+ ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes)/*!<in TRUE if atomic writes are used */
__attribute__((nonnull, warn_unused_result));
/***********************************************************************//**
Deletes a file. The file has to be closed before calling this.
@@ -585,6 +590,7 @@ pfs_os_file_create_simple_func(
ulint access_type,/*!< in: OS_FILE_READ_ONLY or
OS_FILE_READ_WRITE */
ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes,/*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
__attribute__((nonnull, warn_unused_result));
@@ -610,6 +616,7 @@ pfs_os_file_create_simple_no_error_handling_func(
OS_FILE_READ_ALLOW_DELETE; the last option is
used by a backup program reading the file */
ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
__attribute__((nonnull, warn_unused_result));
@@ -638,6 +645,7 @@ pfs_os_file_create_func(
function source code for the exact rules */
ulint type, /*!< in: OS_DATA_FILE or OS_LOG_FILE */
ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
__attribute__((nonnull, warn_unused_result));
@@ -716,6 +724,7 @@ pfs_os_aio_func(
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line);/*!< in: line where the func invoked */
/*******************************************************************//**
@@ -1044,10 +1053,16 @@ os_aio_func(
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
- void* message2);/*!< in: message for the aio handler
+ void* message2,/*!< in: message for the aio handler
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
+ ulint write_size);/*!< in/out: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
+
/************************************************************************//**
Wakes up all async i/o threads so that they know to exit themselves in
shutdown. */
diff --git a/storage/innobase/include/os0file.ic b/storage/innobase/include/os0file.ic
index bdd7eb5f8f4..2be0f6a8d97 100644
--- a/storage/innobase/include/os0file.ic
+++ b/storage/innobase/include/os0file.ic
@@ -1,6 +1,7 @@
/*****************************************************************************
Copyright (c) 2010, 2011, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -44,6 +45,7 @@ pfs_os_file_create_simple_func(
ulint access_type,/*!< in: OS_FILE_READ_ONLY or
OS_FILE_READ_WRITE */
ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
{
@@ -59,7 +61,7 @@ pfs_os_file_create_simple_func(
name, src_file, src_line);
file = os_file_create_simple_func(name, create_mode,
- access_type, success);
+ access_type, success, atomic_writes);
/* Regsiter the returning "file" value with the system */
register_pfs_file_open_end(locker, file);
@@ -88,6 +90,7 @@ pfs_os_file_create_simple_no_error_handling_func(
OS_FILE_READ_ALLOW_DELETE; the last option is
used by a backup program reading the file */
ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
{
@@ -103,7 +106,7 @@ pfs_os_file_create_simple_no_error_handling_func(
name, src_file, src_line);
file = os_file_create_simple_no_error_handling_func(
- name, create_mode, access_type, success);
+ name, create_mode, access_type, success, atomic_writes);
register_pfs_file_open_end(locker, file);
@@ -134,6 +137,7 @@ pfs_os_file_create_func(
function source code for the exact rules */
ulint type, /*!< in: OS_DATA_FILE or OS_LOG_FILE */
ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
{
@@ -148,7 +152,7 @@ pfs_os_file_create_func(
: PSI_FILE_OPEN),
name, src_file, src_line);
- file = os_file_create_func(name, create_mode, purpose, type, success);
+ file = os_file_create_func(name, create_mode, purpose, type, success, atomic_writes);
register_pfs_file_open_end(locker, file);
@@ -210,6 +214,7 @@ pfs_os_aio_func(
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
+ ibool atomic_writes, /*!<in TRUE if atomic writes are used */
const char* src_file,/*!< in: file name where func invoked */
ulint src_line)/*!< in: line where the func invoked */
{
@@ -225,7 +230,7 @@ pfs_os_aio_func(
src_file, src_line);
result = os_aio_func(type, mode, name, file, buf, offset,
- n, message1, message2);
+ n, message1, message2, atomic_writes);
register_pfs_file_io_end(locker, n);
diff --git a/storage/innobase/include/srv0mon.h b/storage/innobase/include/srv0mon.h
index 48d4b94dcae..c0869c2434f 100644
--- a/storage/innobase/include/srv0mon.h
+++ b/storage/innobase/include/srv0mon.h
@@ -2,6 +2,7 @@
Copyright (c) 2010, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
@@ -163,6 +164,7 @@ enum monitor_id_t {
MONITOR_OVLD_BUF_POOL_PAGES_FREE,
MONITOR_OVLD_PAGE_CREATED,
MONITOR_OVLD_PAGES_WRITTEN,
+ MONITOR_OVLD_INDEX_PAGES_WRITTEN,
MONITOR_OVLD_PAGES_READ,
MONITOR_OVLD_BYTE_READ,
MONITOR_OVLD_BYTE_WRITTEN,
@@ -303,6 +305,14 @@ enum monitor_id_t {
MONITOR_PAGE_DECOMPRESS,
MONITOR_PAD_INCREMENTS,
MONITOR_PAD_DECREMENTS,
+ /* New monitor variables for page compression */
+ MONITOR_OVLD_PAGE_COMPRESS_SAVED,
+ MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT512,
+ MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT4096,
+ MONITOR_OVLD_PAGES_PAGE_COMPRESSED,
+ MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP,
+ MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP_SAVED,
+ MONITOR_OVLD_PAGES_PAGE_DECOMPRESSED,
/* Index related counters */
MONITOR_MODULE_INDEX,
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 1e98cf690d8..f4fa8b434fe 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -3,6 +3,7 @@
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, 2009, Google Inc.
Copyright (c) 2009, Percona Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -102,6 +103,23 @@ struct srv_stats_t {
a disk page */
ulint_ctr_1_t buf_pool_reads;
+ /** Number of bytes saved by page compression */
+ ulint_ctr_64_t page_compression_saved;
+ /** Number of 512Byte TRIM by page compression */
+ ulint_ctr_64_t page_compression_trim_sect512;
+ /** Number of 4K TRIM by page compression */
+ ulint_ctr_64_t page_compression_trim_sect4096;
+ /* Number of index pages written */
+ ulint_ctr_64_t index_pages_written;
+ /* Number of pages compressed with page compression */
+ ulint_ctr_64_t pages_page_compressed;
+ /* Number of TRIM operations induced by page compression */
+ ulint_ctr_64_t page_compressed_trim_op;
+ /* Number of TRIM operations saved by using actual write size knowledge */
+ ulint_ctr_64_t page_compressed_trim_op_saved;
+ /* Number of pages decompressed with page compression */
+ ulint_ctr_64_t pages_page_decompressed;
+
/** Number of data read in total (in bytes) */
ulint_ctr_1_t data_read;
@@ -217,6 +235,29 @@ OS (provided we compiled Innobase with it in), otherwise we will
use simulated aio we build below with threads.
Currently we support native aio on windows and linux */
extern my_bool srv_use_native_aio;
+
+/* Is page compression used */
+extern my_bool srv_compress_pages;
+
+/* Is page compression used only for index pages */
+extern my_bool srv_page_compress_index_pages;
+
+/* Frequency of trim operations */
+extern long srv_trim_pct;
+
+/* Use trim operation */
+extern my_bool srv_use_trim;
+
+/* Use posix fallocate */
+extern my_bool srv_use_posix_fallocate;
+
+/* Use atomic writes i.e disable doublewrite buffer */
+extern my_bool srv_use_atomic_writes;
+
+/* Default zlib compression level */
+extern long srv_compress_zlib_level;
+
+
#ifdef __WIN__
extern ibool srv_use_native_conditions;
#endif /* __WIN__ */
@@ -348,11 +389,6 @@ extern ibool srv_use_doublewrite_buf;
extern ulong srv_doublewrite_batch_size;
extern ulong srv_checksum_algorithm;
-extern ibool srv_use_atomic_writes;
-#ifdef HAVE_POSIX_FALLOCATE
-extern ibool srv_use_posix_fallocate;
-#endif
-
extern ulong srv_max_buf_pool_modified_pct;
extern ulong srv_max_purge_lag;
extern ulong srv_max_purge_lag_delay;
@@ -850,6 +886,24 @@ struct export_var_t{
ulint innodb_purge_view_trx_id_age; /*!< rw_max_trx_id
- purged view's min trx_id */
#endif /* UNIV_DEBUG */
+
+ ib_int64_t innodb_page_compression_saved;/*!< Number of bytes saved
+ by page compression */
+ ib_int64_t innodb_page_compression_trim_sect512;/*!< Number of 512b TRIM
+ by page compression */
+ ib_int64_t innodb_page_compression_trim_sect4096;/*!< Number of 4K byte TRIM
+ by page compression */
+ ib_int64_t innodb_index_pages_written; /*!< Number of index pages
+ written */
+ ib_int64_t innodb_pages_page_compressed;/*!< Number of pages
+ compressed by page compression */
+ ib_int64_t innodb_page_compressed_trim_op;/*!< Number of TRIM operations
+ induced by page compression */
+ ib_int64_t innodb_page_compressed_trim_op_saved;/*!< Number of TRIM operations
+ saved by page compression */
+ ib_int64_t innodb_pages_page_decompressed;/*!< Number of pages
+ decompressed by page
+ compression */
};
/** Thread slot in the thread table. */
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index b6909f4771a..00b4c02465a 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -2,6 +2,7 @@
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2009, Google Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -1165,7 +1166,7 @@ log_group_file_header_flush(
(ulint) (dest_offset / UNIV_PAGE_SIZE),
(ulint) (dest_offset % UNIV_PAGE_SIZE),
OS_FILE_LOG_BLOCK_SIZE,
- buf, group);
+ buf, group, 0);
srv_stats.os_log_pending_writes.dec();
}
@@ -1293,7 +1294,7 @@ loop:
fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE, group->space_id, 0,
(ulint) (next_offset / UNIV_PAGE_SIZE),
(ulint) (next_offset % UNIV_PAGE_SIZE), write_len, buf,
- group);
+ group, 0);
srv_stats.os_log_pending_writes.dec();
@@ -1859,7 +1860,7 @@ log_group_checkpoint(
write_offset / UNIV_PAGE_SIZE,
write_offset % UNIV_PAGE_SIZE,
OS_FILE_LOG_BLOCK_SIZE,
- buf, ((byte*) group + 1));
+ buf, ((byte*) group + 1), 0);
ut_ad(((ulint) group & 0x1UL) == 0);
}
@@ -1939,7 +1940,7 @@ log_group_read_checkpoint_info(
fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->space_id, 0,
field / UNIV_PAGE_SIZE, field % UNIV_PAGE_SIZE,
- OS_FILE_LOG_BLOCK_SIZE, log_sys->checkpoint_buf, NULL);
+ OS_FILE_LOG_BLOCK_SIZE, log_sys->checkpoint_buf, NULL, 0);
}
/******************************************************//**
@@ -2233,7 +2234,7 @@ loop:
fil_io(OS_FILE_READ | OS_FILE_LOG, sync, group->space_id, 0,
(ulint) (source_offset / UNIV_PAGE_SIZE),
(ulint) (source_offset % UNIV_PAGE_SIZE),
- len, buf, NULL);
+ len, buf, NULL, 0);
start_lsn += len;
buf += len;
@@ -2298,7 +2299,7 @@ log_group_archive_file_header_write(
dest_offset / UNIV_PAGE_SIZE,
dest_offset % UNIV_PAGE_SIZE,
2 * OS_FILE_LOG_BLOCK_SIZE,
- buf, &log_archive_io);
+ buf, &log_archive_io, 0);
}
/******************************************************//**
@@ -2334,7 +2335,7 @@ log_group_archive_completed_header_write(
dest_offset % UNIV_PAGE_SIZE,
OS_FILE_LOG_BLOCK_SIZE,
buf + LOG_FILE_ARCH_COMPLETED,
- &log_archive_io);
+ &log_archive_io, 0);
}
/******************************************************//**
@@ -2462,7 +2463,7 @@ loop:
(ulint) (next_offset / UNIV_PAGE_SIZE),
(ulint) (next_offset % UNIV_PAGE_SIZE),
ut_calc_align(len, OS_FILE_LOG_BLOCK_SIZE), buf,
- &log_archive_io);
+ &log_archive_io, 0);
start_lsn += len;
next_offset += len;
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 8cefa9e4b70..a3df6a8d5bd 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -2,6 +2,7 @@
Copyright (c) 1997, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -2063,7 +2064,7 @@ recv_apply_log_recs_for_backup(void)
error = fil_io(OS_FILE_READ, TRUE,
recv_addr->space, zip_size,
recv_addr->page_no, 0, zip_size,
- block->page.zip.data, NULL);
+ block->page.zip.data, NULL, 0);
if (error == DB_SUCCESS
&& !buf_zip_decompress(block, TRUE)) {
exit(1);
@@ -2073,7 +2074,7 @@ recv_apply_log_recs_for_backup(void)
recv_addr->space, 0,
recv_addr->page_no, 0,
UNIV_PAGE_SIZE,
- block->frame, NULL);
+ block->frame, NULL, 0);
}
if (error != DB_SUCCESS) {
@@ -2102,13 +2103,13 @@ recv_apply_log_recs_for_backup(void)
recv_addr->space, zip_size,
recv_addr->page_no, 0,
zip_size,
- block->page.zip.data, NULL);
+ block->page.zip.data, NULL, 0);
} else {
error = fil_io(OS_FILE_WRITE, TRUE,
recv_addr->space, 0,
recv_addr->page_no, 0,
UNIV_PAGE_SIZE,
- block->frame, NULL);
+ block->frame, NULL, 0);
}
skip_this_recv_addr:
recv_addr = HASH_GET_NEXT(addr_hash, recv_addr);
@@ -3074,7 +3075,7 @@ recv_recovery_from_checkpoint_start_func(
fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, max_cp_group->space_id, 0,
0, 0, LOG_FILE_HDR_SIZE,
- log_hdr_buf, max_cp_group);
+ log_hdr_buf, max_cp_group, 0);
if (0 == ut_memcmp(log_hdr_buf + LOG_FILE_WAS_CREATED_BY_HOT_BACKUP,
(byte*)"ibbackup", (sizeof "ibbackup") - 1)) {
@@ -3105,7 +3106,7 @@ recv_recovery_from_checkpoint_start_func(
fil_io(OS_FILE_WRITE | OS_FILE_LOG, TRUE,
max_cp_group->space_id, 0,
0, 0, OS_FILE_LOG_BLOCK_SIZE,
- log_hdr_buf, max_cp_group);
+ log_hdr_buf, max_cp_group, 0);
}
#ifdef UNIV_LOG_ARCHIVE
@@ -3753,8 +3754,8 @@ ask_again:
#endif
/* Read the archive file header */
- fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE, group->archive_space_id, 0, 0,
- LOG_FILE_HDR_SIZE, buf, NULL);
+ fil_io(OS_FILE_READ | OS_FILE_LOG, true, group->archive_space_id, 0, 0,
+ LOG_FILE_HDR_SIZE, buf, NULL, 0);
/* Check if the archive file header is consistent */
@@ -3827,7 +3828,7 @@ ask_again:
fil_io(OS_FILE_READ | OS_FILE_LOG, TRUE,
group->archive_space_id, read_offset / UNIV_PAGE_SIZE,
- read_offset % UNIV_PAGE_SIZE, len, buf, NULL);
+ read_offset % UNIV_PAGE_SIZE, len, buf, NULL, 0);
ret = recv_scan_log_recs(
(buf_pool_get_n_pages()
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index d1b2b12bf59..60331f9c483 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -2,6 +2,7 @@
Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2009, Percona Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted
by Percona Inc.. Those modifications are
@@ -42,8 +43,14 @@ Created 10/21/1995 Heikki Tuuri
#include "srv0srv.h"
#include "srv0start.h"
#include "fil0fil.h"
+#include "fil0pagecompress.h"
#include "buf0buf.h"
#include "srv0mon.h"
+#include "srv0srv.h"
+#ifdef HAVE_POSIX_FALLOCATE
+#include "fcntl.h"
+#include "linux/falloc.h"
+#endif
#ifndef UNIV_HOTBACKUP
# include "os0sync.h"
# include "os0thread.h"
@@ -60,6 +67,13 @@ Created 10/21/1995 Heikki Tuuri
#include <libaio.h>
#endif
+#if defined(UNIV_LINUX) && defined(HAVE_SYS_IOCTL_H)
+# include <sys/ioctl.h>
+# ifndef DFS_IOCTL_ATOMIC_WRITE_SET
+# define DFS_IOCTL_ATOMIC_WRITE_SET _IOW(0x95, 2, uint)
+# endif
+#endif
+
/** Insert buffer segment id */
static const ulint IO_IBUF_SEGMENT = 0;
@@ -175,6 +189,25 @@ struct os_aio_slot_t{
and which can be used to identify
which pending aio operation was
completed */
+ ulint bitmap;
+
+ byte* page_compression_page; /*!< Memory allocated for
+ page compressed page and
+ freed after the write
+ has been completed */
+
+ ulint write_size; /*!< Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
+
+ byte* page_buf; /*!< Actual page buffer for
+ page compressed pages, do not
+ free this */
+
+ ibool page_compress_success;
+
#ifdef WIN_ASYNC_IO
HANDLE handle; /*!< handle object we need in the
OVERLAPPED struct */
@@ -294,6 +327,79 @@ UNIV_INTERN ulint os_n_pending_writes = 0;
/** Number of pending read operations */
UNIV_INTERN ulint os_n_pending_reads = 0;
+/** After first fallocate failure we will disable os_file_trim */
+UNIV_INTERN ibool os_fallocate_failed = FALSE;
+
+/**********************************************************************//**
+Directly manipulate the allocated disk space by deallocating for the file referred to
+by fd for the byte range starting at offset and continuing for len bytes.
+Within the specified range, partial file system blocks are zeroed, and whole
+file system blocks are removed from the file. After a successful call,
+subsequent reads from this range will return zeroes.
+@return true if success, false if error */
+UNIV_INTERN
+ibool
+os_file_trim(
+/*=========*/
+ os_file_t file, /*!< in: file to be trimmed */
+ os_aio_slot_t* slot, /*!< in: slot structure */
+ ulint len); /*!< in: length of area */
+
+/**********************************************************************//**
+Allocate memory for temporal buffer used for page compression. This
+buffer is freed later. */
+UNIV_INTERN
+void
+os_slot_alloc_page_buf(
+/*===================*/
+ os_aio_slot_t* slot); /*!< in: slot structure */
+
+/****************************************************************//**
+Does error handling when a file operation fails.
+@return TRUE if we should retry the operation */
+static
+ibool
+os_file_handle_error_no_exit(
+/*=========================*/
+ const char* name, /*!< in: name of a file or NULL */
+ const char* operation, /*!< in: operation */
+ ibool on_error_silent,/*!< in: if TRUE then don't print
+ any message to the log. */
+ const char* file, /*!< in: file name */
+ const ulint line); /*!< in: line */
+
+/****************************************************************//**
+Tries to enable the atomic write feature, if available, for the specified file
+handle.
+@return TRUE if success */
+static __attribute__((warn_unused_result))
+ibool
+os_file_set_atomic_writes(
+/*======================*/
+ const char* name /*!< in: name of the file */
+ __attribute__((unused)),
+ os_file_t file /*!< in: handle to the file */
+ __attribute__((unused)))
+{
+#ifdef DFS_IOCTL_ATOMIC_WRITE_SET
+ int atomic_option = 1;
+
+ if (ioctl(file, DFS_IOCTL_ATOMIC_WRITE_SET, &atomic_option)) {
+
+ os_file_handle_error_no_exit(name, "ioctl", FALSE, __FILE__, __LINE__);
+ return(FALSE);
+ }
+
+ return(TRUE);
+#else
+ fprintf(stderr, "InnoDB: Error: trying to enable atomic writes on "
+ "non-supported platform! Please restart with "
+ "innodb_use_atomic_writes disabled.\n");
+ return(FALSE);
+#endif
+}
+
+
#ifdef UNIV_DEBUG
# ifndef UNIV_HOTBACKUP
/**********************************************************************//**
@@ -498,7 +604,17 @@ os_file_get_last_error_low(
fprintf(stderr,
"InnoDB: The error means mysqld does not have"
" the access rights to\n"
- "InnoDB: the directory.\n");
+ "InnoDECANCELEDB: the directory.\n");
+ } else if (err == ECANCELED) {
+ fprintf(stderr,
+ "InnoDB: Operation canceled (%d):%s\n",
+ err, strerror(err));
+
+ if(srv_use_atomic_writes) {
+ fprintf(stderr,
+ "InnoDB: Error trying to enable atomic writes on "
+ "non-supported destination!\n");
+ }
} else {
if (strerror(err) != NULL) {
fprintf(stderr,
@@ -530,6 +646,8 @@ os_file_get_last_error_low(
case ENOTDIR:
case EISDIR:
return(OS_FILE_PATH_ERROR);
+ case ECANCELED:
+ return(OS_FILE_OPERATION_NOT_SUPPORTED);
case EAGAIN:
if (srv_use_native_aio) {
return(OS_FILE_AIO_RESOURCES_RESERVED);
@@ -574,9 +692,11 @@ os_file_handle_error_cond_exit(
const char* operation, /*!< in: operation */
ibool should_exit, /*!< in: call exit(3) if unknown error
and this parameter is TRUE */
- ibool on_error_silent)/*!< in: if TRUE then don't print
+ ibool on_error_silent,/*!< in: if TRUE then don't print
any message to the log iff it is
an unknown non-fatal error */
+ const char* file, /*!< in: file name */
+ const ulint line) /*!< in: line */
{
ulint err;
@@ -606,6 +726,9 @@ os_file_handle_error_cond_exit(
" InnoDB: Disk is full. Try to clean the disk"
" to free space.\n");
+ fprintf(stderr,
+ " InnoDB: at file %s and at line %ld\n", file, line);
+
os_has_said_disk_full = TRUE;
fflush(stderr);
@@ -652,6 +775,9 @@ os_file_handle_error_cond_exit(
operation, err);
}
+ fprintf(stderr,
+ " InnoDB: at file %s and at line %ld\n", file, line);
+
if (should_exit) {
ut_print_timestamp(stderr);
fprintf(stderr, " InnoDB: Cannot continue "
@@ -675,10 +801,12 @@ ibool
os_file_handle_error(
/*=================*/
const char* name, /*!< in: name of a file or NULL */
- const char* operation) /*!< in: operation */
+ const char* operation, /*!< in: operation */
+ const char* file, /*!< in: file name */
+ const ulint line) /*!< in: line */
{
/* exit in case of unknown error */
- return(os_file_handle_error_cond_exit(name, operation, TRUE, FALSE));
+ return(os_file_handle_error_cond_exit(name, operation, TRUE, FALSE, file, line));
}
/****************************************************************//**
@@ -690,12 +818,14 @@ os_file_handle_error_no_exit(
/*=========================*/
const char* name, /*!< in: name of a file or NULL */
const char* operation, /*!< in: operation */
- ibool on_error_silent)/*!< in: if TRUE then don't print
+ ibool on_error_silent,/*!< in: if TRUE then don't print
any message to the log. */
+ const char* file, /*!< in: file name */
+ const ulint line) /*!< in: line */
{
/* don't exit in case of unknown error */
return(os_file_handle_error_cond_exit(
- name, operation, FALSE, on_error_silent));
+ name, operation, FALSE, on_error_silent, file, line));
}
#undef USE_FILE_LOCK
@@ -835,7 +965,7 @@ os_file_opendir(
if (dir == INVALID_HANDLE_VALUE) {
if (error_is_fatal) {
- os_file_handle_error(dirname, "opendir");
+ os_file_handle_error(dirname, "opendir", __FILE__, __LINE__);
}
return(NULL);
@@ -846,7 +976,7 @@ os_file_opendir(
dir = opendir(dirname);
if (dir == NULL && error_is_fatal) {
- os_file_handle_error(dirname, "opendir");
+ os_file_handle_error(dirname, "opendir", __FILE__, __LINE__);
}
return(dir);
@@ -868,7 +998,7 @@ os_file_closedir(
ret = FindClose(dir);
if (!ret) {
- os_file_handle_error_no_exit(NULL, "closedir", FALSE);
+ os_file_handle_error_no_exit(NULL, "closedir", FALSE, __FILE__, __LINE__);
return(-1);
}
@@ -880,7 +1010,7 @@ os_file_closedir(
ret = closedir(dir);
if (ret) {
- os_file_handle_error_no_exit(NULL, "closedir", FALSE);
+ os_file_handle_error_no_exit(NULL, "closedir", FALSE, __FILE__, __LINE__);
}
return(ret);
@@ -952,7 +1082,7 @@ next_file:
return(1);
} else {
- os_file_handle_error_no_exit(NULL, "readdir_next_file", FALSE);
+ os_file_handle_error_no_exit(NULL, "readdir_next_file", FALSE, __FILE__, __LINE__);
return(-1);
}
#else
@@ -1038,7 +1168,7 @@ next_file:
goto next_file;
}
- os_file_handle_error_no_exit(full_path, "stat", FALSE);
+ os_file_handle_error_no_exit(full_path, "stat", FALSE, __FILE__, __LINE__);
ut_free(full_path);
@@ -1089,7 +1219,7 @@ os_file_create_directory(
&& !fail_if_exists))) {
os_file_handle_error_no_exit(
- pathname, "CreateDirectory", FALSE);
+ pathname, "CreateDirectory", FALSE, __FILE__, __LINE__);
return(FALSE);
}
@@ -1102,7 +1232,7 @@ os_file_create_directory(
if (!(rcode == 0 || (errno == EEXIST && !fail_if_exists))) {
/* failure */
- os_file_handle_error_no_exit(pathname, "mkdir", FALSE);
+ os_file_handle_error_no_exit(pathname, "mkdir", FALSE, __FILE__, __LINE__);
return(FALSE);
}
@@ -1126,7 +1256,8 @@ os_file_create_simple_func(
ulint create_mode,/*!< in: create mode */
ulint access_type,/*!< in: OS_FILE_READ_ONLY or
OS_FILE_READ_WRITE */
- ibool* success)/*!< out: TRUE if succeed, FALSE if error */
+ ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes) /*!<in TRUE if atomic writes are used */
{
os_file_t file;
ibool retry;
@@ -1211,7 +1342,7 @@ os_file_create_simple_func(
retry = os_file_handle_error(
name, create_mode == OS_FILE_OPEN ?
- "open" : "create");
+ "open" : "create", __FILE__, __LINE__);
} else {
*success = TRUE;
@@ -1220,6 +1351,14 @@ os_file_create_simple_func(
} while (retry);
+ if (file != INVALID_HANDLE_VALUE
+ && (srv_use_atomic_writes || atomic_writes)
+ && !os_file_set_atomic_writes(name, file)) {
+ CloseHandle(file);
+ *success = FALSE;
+ file = INVALID_HANDLE_VALUE;
+ }
+
#else /* __WIN__ */
int create_flag;
@@ -1279,7 +1418,7 @@ os_file_create_simple_func(
retry = os_file_handle_error(
name,
create_mode == OS_FILE_OPEN
- ? "open" : "create");
+ ? "open" : "create", __FILE__, __LINE__);
} else {
*success = TRUE;
retry = false;
@@ -1299,6 +1438,14 @@ os_file_create_simple_func(
}
#endif /* USE_FILE_LOCK */
+ if (file != -1
+ && (srv_use_atomic_writes || atomic_writes)
+ && !os_file_set_atomic_writes(name, file)) {
+ *success = FALSE;
+ close(file);
+ file = -1;
+ }
+
#endif /* __WIN__ */
return(file);
@@ -1321,7 +1468,8 @@ os_file_create_simple_no_error_handling_func(
OS_FILE_READ_WRITE, or
OS_FILE_READ_ALLOW_DELETE; the last option is
used by a backup program reading the file */
- ibool* success)/*!< out: TRUE if succeed, FALSE if error */
+ ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes) /*!<in TRUE if atomic writes are used */
{
os_file_t file;
@@ -1383,6 +1531,14 @@ os_file_create_simple_no_error_handling_func(
attributes,
NULL); // No template file
+ if (file != INVALID_HANDLE_VALUE
+ && (srv_use_atomic_writes || atomic_writes)
+ && !os_file_set_atomic_writes(name, file)) {
+ CloseHandle(file);
+ file = INVALID_HANDLE_VALUE;
+ }
+
+
*success = (file != INVALID_HANDLE_VALUE);
#else /* __WIN__ */
int create_flag;
@@ -1443,6 +1599,14 @@ os_file_create_simple_no_error_handling_func(
}
#endif /* USE_FILE_LOCK */
+ if (file != -1
+ && (srv_use_atomic_writes || atomic_writes)
+ && !os_file_set_atomic_writes(name, file)) {
+ *success = FALSE;
+ close(file);
+ file = -1;
+ }
+
#endif /* __WIN__ */
return(file);
@@ -1513,7 +1677,9 @@ os_file_create_func(
async i/o or unbuffered i/o: look in the
function source code for the exact rules */
ulint type, /*!< in: OS_DATA_FILE or OS_LOG_FILE */
- ibool* success)/*!< out: TRUE if succeed, FALSE if error */
+ ibool* success,/*!< out: TRUE if succeed, FALSE if error */
+ ibool atomic_writes) /*! in: true if atomic writes for
+ this file should be used */
{
os_file_t file;
ibool retry;
@@ -1662,6 +1828,16 @@ os_file_create_func(
} while (retry);
+ if (file != INVALID_HANDLE_VALUE
+ && type == OS_DATA_FILE
+ && (srv_use_atomic_writes || atomic_writes)
+ && !os_file_set_atomic_writes(name, file)) {
+ CloseHandle(file);
+ *success = FALSE;
+ file = INVALID_HANDLE_VALUE;
+ }
+
+
#else /* __WIN__ */
int create_flag;
const char* mode_str = NULL;
@@ -1736,9 +1912,9 @@ os_file_create_func(
if (on_error_no_exit) {
retry = os_file_handle_error_no_exit(
- name, operation, on_error_silent);
+ name, operation, on_error_silent, __FILE__, __LINE__);
} else {
- retry = os_file_handle_error(name, operation);
+ retry = os_file_handle_error(name, operation, __FILE__, __LINE__);
}
} else {
*success = TRUE;
@@ -1790,6 +1966,16 @@ os_file_create_func(
}
#endif /* USE_FILE_LOCK */
+ if (file != -1
+ && type == OS_DATA_FILE
+ && (srv_use_atomic_writes || atomic_writes)
+ && !os_file_set_atomic_writes(name, file)) {
+ *success = FALSE;
+ close(file);
+ file = -1;
+ }
+
+
#endif /* __WIN__ */
return(file);
@@ -1848,7 +2034,7 @@ loop:
ret = unlink(name);
if (ret != 0 && errno != ENOENT) {
- os_file_handle_error_no_exit(name, "delete", FALSE);
+ os_file_handle_error_no_exit(name, "delete", FALSE, __FILE__, __LINE__);
return(false);
}
@@ -1912,7 +2098,7 @@ loop:
ret = unlink(name);
if (ret != 0) {
- os_file_handle_error_no_exit(name, "delete", FALSE);
+ os_file_handle_error_no_exit(name, "delete", FALSE, __FILE__, __LINE__);
return(false);
}
@@ -1956,7 +2142,7 @@ os_file_rename_func(
return(TRUE);
}
- os_file_handle_error_no_exit(oldpath, "rename", FALSE);
+ os_file_handle_error_no_exit(oldpath, "rename", FALSE, __FILE__, __LINE__);
return(FALSE);
#else
@@ -1965,7 +2151,7 @@ os_file_rename_func(
ret = rename(oldpath, newpath);
if (ret != 0) {
- os_file_handle_error_no_exit(oldpath, "rename", FALSE);
+ os_file_handle_error_no_exit(oldpath, "rename", FALSE, __FILE__, __LINE__);
return(FALSE);
}
@@ -1996,7 +2182,7 @@ os_file_close_func(
return(TRUE);
}
- os_file_handle_error(NULL, "close");
+ os_file_handle_error(NULL, "close", __FILE__, __LINE__);
return(FALSE);
#else
@@ -2005,7 +2191,7 @@ os_file_close_func(
ret = close(file);
if (ret == -1) {
- os_file_handle_error(NULL, "close");
+ os_file_handle_error(NULL, "close", __FILE__, __LINE__);
return(FALSE);
}
@@ -2108,7 +2294,7 @@ os_file_set_size(
"space for file \'%s\' failed. Current size "
"%lu, desired size %lu\n",
name, current_size, size);
- os_file_handle_error_no_exit(name, "posix_fallocate", FALSE);
+ os_file_handle_error_no_exit(name, "posix_fallocate", FALSE, __FILE__, __LINE__);
return(FALSE);
}
return(TRUE);
@@ -2274,7 +2460,7 @@ os_file_flush_func(
return(TRUE);
}
- os_file_handle_error(NULL, "flush");
+ os_file_handle_error(NULL, "flush", __FILE__, __LINE__);
/* It is a fatal error if a file flush does not succeed, because then
the database can get corrupt on disk */
@@ -2328,7 +2514,7 @@ os_file_flush_func(
ib_logf(IB_LOG_LEVEL_ERROR, "The OS said file flush did not succeed");
- os_file_handle_error(NULL, "flush");
+ os_file_handle_error(NULL, "flush", __FILE__, __LINE__);
/* It is a fatal error if a file flush does not succeed, because then
the database can get corrupt on disk */
@@ -2635,6 +2821,9 @@ try_again:
os_mutex_exit(os_file_count_mutex);
if (ret && len == n) {
+ if (fil_page_is_compressed((byte *)buf)) {
+ fil_decompress_page(NULL, (byte *)buf, len);
+ }
return(TRUE);
}
#else /* __WIN__ */
@@ -2647,6 +2836,9 @@ try_again:
ret = os_file_pread(file, buf, n, offset);
if ((ulint) ret == n) {
+ if (fil_page_is_compressed((byte *)buf)) {
+ fil_decompress_page(NULL, (byte *)buf, n);
+ }
return(TRUE);
}
@@ -2658,7 +2850,7 @@ try_again:
#ifdef __WIN__
error_handling:
#endif
- retry = os_file_handle_error(NULL, "read");
+ retry = os_file_handle_error(NULL, "read", __FILE__, __LINE__);
if (retry) {
goto try_again;
@@ -2781,7 +2973,7 @@ try_again:
#ifdef __WIN__
error_handling:
#endif
- retry = os_file_handle_error_no_exit(NULL, "read", FALSE);
+ retry = os_file_handle_error_no_exit(NULL, "read", FALSE, __FILE__, __LINE__);
if (retry) {
goto try_again;
@@ -3030,7 +3222,7 @@ os_file_status(
} else if (ret) {
/* file exists, but stat call failed */
- os_file_handle_error_no_exit(path, "stat", FALSE);
+ os_file_handle_error_no_exit(path, "stat", FALSE, __FILE__, __LINE__);
return(FALSE);
}
@@ -3058,7 +3250,7 @@ os_file_status(
} else if (ret) {
/* file exists, but stat call failed */
- os_file_handle_error_no_exit(path, "stat", FALSE);
+ os_file_handle_error_no_exit(path, "stat", FALSE, __FILE__, __LINE__);
return(FALSE);
}
@@ -3107,7 +3299,7 @@ os_file_get_status(
} else if (ret) {
/* file exists, but stat call failed */
- os_file_handle_error_no_exit(path, "stat", FALSE);
+ os_file_handle_error_no_exit(path, "stat", FALSE, __FILE__, __LINE__);
return(DB_FAIL);
@@ -3160,7 +3352,7 @@ os_file_get_status(
} else if (ret) {
/* file exists, but stat call failed */
- os_file_handle_error_no_exit(path, "stat", FALSE);
+ os_file_handle_error_no_exit(path, "stat", FALSE, __FILE__, __LINE__);
return(DB_FAIL);
@@ -3715,7 +3907,8 @@ os_aio_array_create(
array->slots = static_cast<os_aio_slot_t*>(
ut_malloc(n * sizeof(*array->slots)));
- memset(array->slots, 0x0, sizeof(n * sizeof(*array->slots)));
+ memset(array->slots, 0x0, n * sizeof(*array->slots));
+
#ifdef __WIN__
array->handles = static_cast<HANDLE*>(ut_malloc(n * sizeof(HANDLE)));
#endif /* __WIN__ */
@@ -3803,8 +3996,8 @@ os_aio_array_free(
/*==============*/
os_aio_array_t*& array) /*!< in, own: array to free */
{
-#ifdef WIN_ASYNC_IO
ulint i;
+#ifdef WIN_ASYNC_IO
for (i = 0; i < array->n_slots; i++) {
os_aio_slot_t* slot = os_aio_array_get_nth_slot(array, i);
@@ -3826,6 +4019,14 @@ os_aio_array_free(
}
#endif /* LINUX_NATIVE_AIO */
+ for (i = 0; i < array->n_slots; i++) {
+ os_aio_slot_t* slot = os_aio_array_get_nth_slot(array, i);
+ if (slot->page_compression_page) {
+ ut_free(slot->page_compression_page);
+ slot->page_compression_page = NULL;
+ }
+ }
+
ut_free(array->slots);
ut_free(array);
@@ -4159,7 +4360,12 @@ os_aio_array_reserve_slot(
void* buf, /*!< in: buffer where to read or from which
to write */
os_offset_t offset, /*!< in: file offset */
- ulint len) /*!< in: length of the block to read or write */
+ ulint len, /*!< in: length of the block to read or write */
+ ulint write_size) /*!< in: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
{
os_aio_slot_t* slot = NULL;
#ifdef WIN_ASYNC_IO
@@ -4249,6 +4455,54 @@ found:
slot->buf = static_cast<byte*>(buf);
slot->offset = offset;
slot->io_already_done = FALSE;
+ slot->page_compress_success = FALSE;
+ slot->write_size = write_size;
+
+ /* If the space is page compressed and this is write operation
+ and if either only index pages compression is disabled or
+ page is index page and only index pages compression is enabled then
+ we compress the page */
+ if (message1 &&
+ type == OS_FILE_WRITE &&
+ fil_space_is_page_compressed(fil_node_get_space_id(slot->message1)) &&
+ (srv_page_compress_index_pages == false ||
+ (srv_page_compress_index_pages == true && fil_page_is_index_page(slot->buf)))) {
+ ulint real_len = len;
+ byte* tmp = NULL;
+
+ /* Release the array mutex while compressing */
+ os_mutex_exit(array->mutex);
+
+ // We allocate memory for page compressed buffer if and only
+ // if it is not yet allocated.
+ if (slot->page_buf == NULL) {
+ os_slot_alloc_page_buf(slot);
+ }
+
+ ut_ad(slot->page_buf);
+
+ /* Write buffer full of zeros, this is needed for trim,
+ can't really avoid this now. */
+ memset(slot->page_buf, 0, len);
+
+ tmp = fil_compress_page(fil_node_get_space_id(slot->message1), (byte *)buf, slot->page_buf, len, &real_len);
+
+ /* If compression succeeded, set up the length and buffer */
+ if (tmp != buf) {
+ len = real_len;
+ buf = slot->page_buf;
+ slot->len = real_len;
+ slot->page_compress_success = TRUE;
+ } else {
+ slot->page_compress_success = FALSE;
+ }
+
+ /* Take array mutex back, not sure if this is really needed
+ below */
+ os_mutex_enter(array->mutex);
+
+ }
+
#ifdef WIN_ASYNC_IO
control = &slot->control;
@@ -4523,10 +4777,15 @@ os_aio_func(
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
- void* message2)/*!< in: message for the aio handler
+ void* message2,/*!< in: message for the aio handler
(can be used to identify a completed
aio operation); ignored if mode is
OS_AIO_SYNC */
+ ulint write_size)/*!< in/out: Actual write size initialized
+ after fist successfull trim
+ operation for this page and if
+ initialized we do not trim again if
+ actual page size does not decrease. */
{
os_aio_array_t* array;
os_aio_slot_t* slot;
@@ -4624,7 +4883,8 @@ try_again:
}
slot = os_aio_array_reserve_slot(type, array, message1, message2, file,
- name, buf, offset, n);
+ name, buf, offset, n, write_size);
+
if (type == OS_FILE_READ) {
if (srv_use_native_aio) {
os_n_file_reads++;
@@ -4704,7 +4964,7 @@ err_exit:
os_aio_array_free_slot(array, slot);
if (os_file_handle_error(
- name,type == OS_FILE_READ ? "aio read" : "aio write")) {
+ name,type == OS_FILE_READ ? "aio read" : "aio write", __FILE__, __LINE__)) {
goto try_again;
}
@@ -4817,7 +5077,7 @@ os_aio_windows_handle(
if (ret && len == slot->len) {
ret_val = TRUE;
- } else if (os_file_handle_error(slot->name, "Windows aio")) {
+ } else if (os_file_handle_error(slot->name, "Windows aio", __FILE__, __LINE__)) {
retry = TRUE;
} else {
@@ -4847,9 +5107,17 @@ os_aio_windows_handle(
switch (slot->type) {
case OS_FILE_WRITE:
- ret = WriteFile(slot->file, slot->buf,
+ if (slot->message1 &&
+ fil_space_is_page_compressed(fil_node_get_space_id(slot->message1)) &&
+ slot->page_buf) {
+ ret = WriteFile(slot->file, slot->page_buf,
+ (DWORD) slot->len, &len,
+ &(slot->control));
+ } else {
+ ret = WriteFile(slot->file, slot->buf,
(DWORD) slot->len, &len,
&(slot->control));
+ }
break;
case OS_FILE_READ:
@@ -4881,6 +5149,29 @@ os_aio_windows_handle(
ret_val = ret && len == slot->len;
}
+ if (slot->message1 &&
+ fil_space_is_page_compressed(fil_node_get_space_id(slot->message1))) {
+ // We allocate memory for page compressed buffer if and only
+ // if it is not yet allocated.
+ if (slot->page_buf == NULL) {
+ os_slot_alloc_page_buf(slot);
+ }
+ ut_ad(slot->page_buf);
+
+ if (slot->type == OS_FILE_READ) {
+ if (fil_page_is_compressed(slot->buf)) {
+ fil_decompress_page(slot->page_buf, slot->buf, slot->len);
+ }
+ } else {
+ if (slot->page_compress_success && fil_page_is_compressed(slot->page_buf)) {
+ if (srv_use_trim && os_fallocate_failed == FALSE) {
+ // Deallocate unused blocks from file system
+ os_file_trim(slot->file, slot, slot->len);
+ }
+ }
+ }
+ }
+
os_aio_array_free_slot(array, slot);
return(ret_val);
@@ -4970,6 +5261,34 @@ retry:
/* We have not overstepped to next segment. */
ut_a(slot->pos < end_pos);
+ /* If the table is page compressed and this is read,
+ we decompress before we annouce the read is
+ complete. For writes, we free the compressed page. */
+ if (slot->message1 &&
+ fil_space_is_page_compressed(fil_node_get_space_id(slot->message1))) {
+ // We allocate memory for page compressed buffer if and only
+ // if it is not yet allocated.
+ if (slot->page_buf == NULL) {
+ os_slot_alloc_page_buf(slot);
+ }
+ ut_ad(slot->page_buf);
+
+ if (slot->type == OS_FILE_READ) {
+ if (fil_page_is_compressed(slot->buf)) {
+ fil_decompress_page(slot->page_buf, slot->buf, slot->len);
+ }
+ } else {
+ if (slot->page_compress_success &&
+ fil_page_is_compressed(slot->page_buf)) {
+ ut_ad(slot->page_compression_page);
+ if (srv_use_trim && os_fallocate_failed == FALSE) {
+ // Deallocate unused blocks from file system
+ os_file_trim(slot->file, slot, slot->len);
+ }
+ }
+ }
+ }
+
/* Mark this request as completed. The error handling
will be done in the calling function. */
os_mutex_enter(array->mutex);
@@ -5113,6 +5432,13 @@ found:
} else {
errno = -slot->ret;
+ if (slot->ret == 0) {
+ fprintf(stderr,
+ "InnoDB: Number of bytes after aio %d requested %lu\n"
+ "InnoDB: from file %s\n",
+ slot->n_bytes, slot->len, slot->name);
+ }
+
/* os_file_handle_error does tell us if we should retry
this IO. As it stands now, we don't do this retry when
reaping requests from a different context than
@@ -5120,7 +5446,7 @@ found:
windows and linux native AIO.
We should probably look into this to transparently
re-submit the IO. */
- os_file_handle_error(slot->name, "Linux aio");
+ os_file_handle_error(slot->name, "Linux aio", __FILE__, __LINE__);
ret = FALSE;
}
@@ -5323,7 +5649,7 @@ consecutive_loop:
if (slot->reserved
&& slot != aio_slot
- && slot->offset == slot->offset + aio_slot->len
+ && slot->offset == aio_slot->offset + aio_slot->len
&& slot->type == aio_slot->type
&& slot->file == aio_slot->file) {
@@ -5791,4 +6117,147 @@ os_aio_all_slots_free(void)
}
#endif /* UNIV_DEBUG */
+#ifdef _WIN32
+#include <winioctl.h>
+#ifndef FSCTL_FILE_LEVEL_TRIM
+#define FSCTL_FILE_LEVEL_TRIM CTL_CODE(FILE_DEVICE_FILE_SYSTEM, 130, METHOD_BUFFERED, FILE_WRITE_DATA)
+typedef struct _FILE_LEVEL_TRIM_RANGE {
+ DWORDLONG Offset;
+ DWORDLONG Length;
+} FILE_LEVEL_TRIM_RANGE, *PFILE_LEVEL_TRIM_RANGE;
+
+typedef struct _FILE_LEVEL_TRIM {
+ DWORD Key;
+ DWORD NumRanges;
+ FILE_LEVEL_TRIM_RANGE Ranges[1];
+} FILE_LEVEL_TRIM, *PFILE_LEVEL_TRIM;
+#endif
+#endif
+
+/**********************************************************************//**
+Directly manipulate the allocated disk space by deallocating for the file referred to
+by fd for the byte range starting at offset and continuing for len bytes.
+Within the specified range, partial file system blocks are zeroed, and whole
+file system blocks are removed from the file. After a successful call,
+subsequent reads from this range will return zeroes.
+@return true if success, false if error */
+UNIV_INTERN
+ibool
+os_file_trim(
+/*=========*/
+ os_file_t file, /*!< in: file to be trimmed */
+ os_aio_slot_t* slot, /*!< in: slot structure */
+ ulint len) /*!< in: length of area */
+{
+
+ size_t trim_len = UNIV_PAGE_SIZE - len;
+ os_offset_t off = slot->offset + len;
+
+ // Nothing to do if trim length is zero or if actual write
+ // size is initialized and it is smaller than current write size.
+ // In first write if we trim we set write_size to actual bytes
+ // written and rest of the page is trimmed. In following writes
+ // there is no need to trim again if write_size only increases
+ // because rest of the page is already trimmed. If actual write
+ // size decreases we need to trim again.
+ if (trim_len == 0 ||
+ (slot->write_size > 0 && len >= slot->write_size)) {
+
+ if (slot->write_size > 0 && len >= slot->write_size) {
+ srv_stats.page_compressed_trim_op_saved.inc();
+ }
+
+ slot->write_size = len;
+
+ return (TRUE);
+ }
+
+#ifdef __linux__
+#if defined(FALLOC_FL_PUNCH_HOLE) && defined (FALLOC_FL_KEEP_SIZE)
+ int ret = fallocate(file, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, trim_len);
+
+ if (ret) {
+ /* After first failure do not try to trim again */
+ os_fallocate_failed = TRUE;
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: [Warning] fallocate call failed with error code %d.\n"
+ " InnoDB: start: %lx len: %lu payload: %lu\n"
+ " InnoDB: Disabling fallocate for now.\n", ret, (slot->offset+len), trim_len, len);
+
+ os_file_handle_error_no_exit(slot->name,
+ " fallocate(FALLOC_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE) ",
+ FALSE, __FILE__, __LINE__);
+
+ slot->write_size = 0;
+
+ return (FALSE);
+ } else {
+ slot->write_size = len;
+ }
+#else
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: [Warning] fallocate not supported on this installation."
+ " InnoDB: Disabling fallocate for now.");
+ os_fallocate_failed = TRUE;
+ slot->write_size = 0;
+
+#endif /* HAVE_FALLOCATE ... */
+
+#elif defined(_WIN32)
+ FILE_LEVEL_TRIM flt;
+ flt.Key = 0;
+ flt.NumRanges = 1;
+ flt.Ranges[0].Offset = off;
+ flt.Ranges[0].Length = trim_len;
+
+ BOOL ret = DeviceIoControl(file,FSCTL_FILE_LEVEL_TRIM,&flt, sizeof(flt), NULL, NULL, NULL, NULL);
+
+ if (!ret) {
+ /* After first failure do not try to trim again */
+ os_fallocate_failed = TRUE;
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: [Warning] fallocate call failed with error.\n"
+ " InnoDB: start: %lx len: %du payload: %lu\n"
+ " InnoDB: Disabling fallocate for now.\n", (slot->offset+len), trim_len, len);
+
+ os_file_handle_error_no_exit(slot->name,
+ " DeviceIOControl(FSCTL_FILE_LEVEL_TRIM) ",
+ FALSE, __FILE__, __LINE__);
+
+ slot->write_size = 0;
+ return (FALSE);
+ } else {
+ slot->write_size = len;
+ }
+#endif
+
+#define SECT_SIZE 512
+ srv_stats.page_compression_trim_sect512.add((trim_len / SECT_SIZE));
+ srv_stats.page_compression_trim_sect4096.add((trim_len / (SECT_SIZE*8)));
+ srv_stats.page_compressed_trim_op.inc();
+
+ return (TRUE);
+
+}
#endif /* !UNIV_HOTBACKUP */
+
+/**********************************************************************//**
+Allocate memory for temporal buffer used for page compression. This
+buffer is freed later. */
+UNIV_INTERN
+void
+os_slot_alloc_page_buf(
+/*===================*/
+ os_aio_slot_t* slot) /*!< in: slot structure */
+{
+ byte* cbuf2;
+ byte* cbuf;
+
+ cbuf2 = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE*2));
+ cbuf = static_cast<byte *>(ut_align(cbuf2, UNIV_PAGE_SIZE));
+ slot->page_compression_page = static_cast<byte *>(cbuf2);
+ slot->page_buf = static_cast<byte *>(cbuf);
+}
diff --git a/storage/innobase/srv/srv0mon.cc b/storage/innobase/srv/srv0mon.cc
index 3b3da2f070f..44a60961110 100644
--- a/storage/innobase/srv/srv0mon.cc
+++ b/storage/innobase/srv/srv0mon.cc
@@ -290,6 +290,12 @@ static monitor_info_t innodb_counter_info[] =
MONITOR_EXISTING | MONITOR_DEFAULT_ON),
MONITOR_DEFAULT_START, MONITOR_OVLD_PAGES_WRITTEN},
+ {"buffer_index_pages_written", "buffer",
+ "Number of index pages written (innodb_index_pages_written)",
+ static_cast<monitor_type_t>(
+ MONITOR_EXISTING | MONITOR_DEFAULT_ON),
+ MONITOR_DEFAULT_START, MONITOR_OVLD_INDEX_PAGES_WRITTEN},
+
{"buffer_pages_read", "buffer",
"Number of pages read (innodb_pages_read)",
static_cast<monitor_type_t>(
@@ -875,6 +881,41 @@ static monitor_info_t innodb_counter_info[] =
MONITOR_NONE,
MONITOR_DEFAULT_START, MONITOR_PAD_DECREMENTS},
+ {"compress_saved", "compression",
+ "Number of bytes saved by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESS_SAVED},
+
+ {"compress_trim_sect512", "compression",
+ "Number of sect-512 TRIMed by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT512},
+
+ {"compress_trim_sect4096", "compression",
+ "Number of sect-4K TRIMed by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT4096},
+
+ {"compress_pages_page_compressed", "compression",
+ "Number of pages compressed by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGES_PAGE_COMPRESSED},
+
+ {"compress_page_compressed_trim_op", "compression",
+ "Number of TRIM operation performed by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP},
+
+ {"compress_page_compressed_trim_op_saved", "compression",
+ "Number of TRIM operation saved by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP_SAVED},
+
+ {"compress_pages_page_decompressed", "compression",
+ "Number of pages decompressed by page compression",
+ MONITOR_NONE,
+ MONITOR_DEFAULT_START, MONITOR_OVLD_PAGES_PAGE_DECOMPRESSED},
+
/* ========== Counters for Index ========== */
{"module_index", "index", "Index Manager",
MONITOR_MODULE,
@@ -1528,6 +1569,11 @@ srv_mon_process_existing_counter(
value = stat.n_pages_written;
break;
+ /* innodb_index_pages_written, the number of page written */
+ case MONITOR_OVLD_INDEX_PAGES_WRITTEN:
+ value = srv_stats.index_pages_written;
+ break;
+
/* innodb_pages_read */
case MONITOR_OVLD_PAGES_READ:
buf_get_total_stat(&stat);
@@ -1769,6 +1815,28 @@ srv_mon_process_existing_counter(
value = btr_cur_n_non_sea;
break;
+ case MONITOR_OVLD_PAGE_COMPRESS_SAVED:
+ value = srv_stats.page_compression_saved;
+ break;
+ case MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT512:
+ value = srv_stats.page_compression_trim_sect512;
+ break;
+ case MONITOR_OVLD_PAGE_COMPRESS_TRIM_SECT4096:
+ value = srv_stats.page_compression_trim_sect4096;
+ break;
+ case MONITOR_OVLD_PAGES_PAGE_COMPRESSED:
+ value = srv_stats.pages_page_compressed;
+ break;
+ case MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP:
+ value = srv_stats.page_compressed_trim_op;
+ break;
+ case MONITOR_OVLD_PAGE_COMPRESSED_TRIM_OP_SAVED:
+ value = srv_stats.page_compressed_trim_op_saved;
+ break;
+ case MONITOR_OVLD_PAGES_PAGE_DECOMPRESSED:
+ value = srv_stats.pages_page_decompressed;
+ break;
+
default:
ut_error;
}
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index 4c5753ac40e..90864cee9ef 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -145,6 +145,24 @@ use simulated aio we build below with threads.
Currently we support native aio on windows and linux */
UNIV_INTERN my_bool srv_use_native_aio = TRUE;
+/* If this flag is TRUE, then we will use page compression
+to the pages */
+UNIV_INTERN my_bool srv_compress_pages = FALSE;
+/* If this flag is TRUE, then we will use page compression
+only for index pages */
+UNIV_INTERN my_bool srv_page_compress_index_pages = FALSE;
+UNIV_INTERN long srv_trim_pct = 100;
+/* Default compression level if page compression is used and no compression
+level is set for the table*/
+UNIV_INTERN long srv_compress_zlib_level = 6;
+/* If this flag is TRUE, then we will use fallocate(PUCH_HOLE)
+to the pages */
+UNIV_INTERN my_bool srv_use_trim = TRUE;
+/* If this flag is TRUE, then we will use posix fallocate for file extentsion */
+UNIV_INTERN my_bool srv_use_posix_fallocate = FALSE;
+/* If this flag is TRUE, then we disable doublewrite buffer */
+UNIV_INTERN my_bool srv_use_atomic_writes = FALSE;
+
#ifdef __WIN__
/* Windows native condition variables. We use runtime loading / function
pointers, because they are not available on Windows Server 2003 and
@@ -347,11 +365,6 @@ batch flushing i.e.: LRU flushing and flush_list flushing. The rest
of the pages are used for single page flushing. */
UNIV_INTERN ulong srv_doublewrite_batch_size = 120;
-UNIV_INTERN ibool srv_use_atomic_writes = FALSE;
-#ifdef HAVE_POSIX_FALLOCATE
-UNIV_INTERN ibool srv_use_posix_fallocate = TRUE;
-#endif
-
UNIV_INTERN ulong srv_replication_delay = 0;
/*-------------------------------------------*/
@@ -375,6 +388,16 @@ static ulint srv_n_rows_read_old = 0;
UNIV_INTERN ulint srv_truncated_status_writes = 0;
UNIV_INTERN ulint srv_available_undo_logs = 0;
+UNIV_INTERN ib_uint64_t srv_page_compression_saved = 0;
+UNIV_INTERN ib_uint64_t srv_page_compression_trim_sect512 = 0;
+UNIV_INTERN ib_uint64_t srv_page_compression_trim_sect4096 = 0;
+UNIV_INTERN ib_uint64_t srv_index_pages_written = 0;
+UNIV_INTERN ib_uint64_t srv_pages_page_compressed = 0;
+UNIV_INTERN ib_uint64_t srv_page_compressed_trim_op = 0;
+UNIV_INTERN ib_uint64_t srv_page_compressed_trim_op_saved = 0;
+UNIV_INTERN ib_uint64_t srv_index_page_decompressed = 0;
+
+
/* Set the following to 0 if you want InnoDB to write messages on
stderr on startup/shutdown. */
UNIV_INTERN ibool srv_print_verbose_log = TRUE;
@@ -1457,6 +1480,14 @@ srv_export_innodb_status(void)
srv_truncated_status_writes;
export_vars.innodb_available_undo_logs = srv_available_undo_logs;
+ export_vars.innodb_page_compression_saved = srv_stats.page_compression_saved;
+ export_vars.innodb_page_compression_trim_sect512 = srv_stats.page_compression_trim_sect512;
+ export_vars.innodb_page_compression_trim_sect4096 = srv_stats.page_compression_trim_sect4096;
+ export_vars.innodb_index_pages_written = srv_stats.index_pages_written;
+ export_vars.innodb_pages_page_compressed = srv_stats.pages_page_compressed;
+ export_vars.innodb_page_compressed_trim_op = srv_stats.page_compressed_trim_op;
+ export_vars.innodb_page_compressed_trim_op_saved = srv_stats.page_compressed_trim_op_saved;
+ export_vars.innodb_pages_page_decompressed = srv_stats.pages_page_decompressed;
#ifdef UNIV_DEBUG
if (purge_sys->done.trx_no == 0
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index efe9f094c0d..0517f4b1468 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -3,6 +3,7 @@
Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2008, Google Inc.
Copyright (c) 2009, Percona Inc.
+Copyright (c) 2013, SkySQL Ab. All Rights Reserved.
Portions of this file contain modifications contributed and copyrighted by
Google, Inc. Those modifications are gratefully acknowledged and are described
@@ -126,7 +127,10 @@ static os_file_t files[1000];
/** io_handler_thread parameters for thread identification */
static ulint n[SRV_MAX_N_IO_THREADS + 6];
/** io_handler_thread identifiers, 32 is the maximum number of purge threads */
-static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32];
+/** pgcomp_thread are 16 total */
+#define START_PGCOMP_CNT (SRV_MAX_N_IO_THREADS + 6 + 32)
+#define PGCOMP_MAX_WORKER 16
+static os_thread_id_t thread_ids[SRV_MAX_N_IO_THREADS + 6 + 32 + PGCOMP_MAX_WORKER];
/** We use this mutex to test the return value of pthread_mutex_trylock
on successful locking. HP-UX does NOT return 0, though Linux et al do. */
@@ -522,7 +526,7 @@ create_log_file(
*file = os_file_create(
innodb_file_log_key, name,
- OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret);
+ OS_FILE_CREATE, OS_FILE_NORMAL, OS_LOG_FILE, &ret, FALSE);
ib_logf(IB_LOG_LEVEL_INFO,
"Setting log file %s size to %lu MB",
@@ -715,7 +719,7 @@ open_log_file(
*file = os_file_create(innodb_file_log_key, name,
OS_FILE_OPEN, OS_FILE_AIO,
- OS_LOG_FILE, &ret);
+ OS_LOG_FILE, &ret, FALSE);
if (!ret) {
ib_logf(IB_LOG_LEVEL_ERROR, "Unable to open '%s'", name);
return(DB_ERROR);
@@ -806,7 +810,7 @@ open_or_create_data_files(
files[i] = os_file_create(
innodb_file_data_key, name, OS_FILE_CREATE,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
if (srv_read_only_mode) {
@@ -849,7 +853,7 @@ open_or_create_data_files(
files[i] = os_file_create(
innodb_file_data_key, name, OS_FILE_OPEN_RAW,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
if (!ret) {
ib_logf(IB_LOG_LEVEL_ERROR,
@@ -881,17 +885,17 @@ open_or_create_data_files(
files[i] = os_file_create(
innodb_file_data_key,
name, OS_FILE_OPEN_RAW,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
} else if (i == 0) {
files[i] = os_file_create(
innodb_file_data_key,
name, OS_FILE_OPEN_RETRY,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
} else {
files[i] = os_file_create(
innodb_file_data_key,
name, OS_FILE_OPEN, OS_FILE_NORMAL,
- OS_DATA_FILE, &ret);
+ OS_DATA_FILE, &ret, FALSE);
}
if (!ret) {
@@ -1078,7 +1082,7 @@ srv_undo_tablespace_create(
innodb_file_data_key,
name,
srv_read_only_mode ? OS_FILE_OPEN : OS_FILE_CREATE,
- OS_FILE_NORMAL, OS_DATA_FILE, &ret);
+ OS_FILE_NORMAL, OS_DATA_FILE, &ret, FALSE);
if (srv_read_only_mode && ret) {
ib_logf(IB_LOG_LEVEL_INFO,
@@ -1159,7 +1163,8 @@ srv_undo_tablespace_open(
| OS_FILE_ON_ERROR_SILENT,
OS_FILE_NORMAL,
OS_DATA_FILE,
- &ret);
+ &ret,
+ FALSE);
/* If the file open was successful then load the tablespace. */
@@ -1430,6 +1435,691 @@ srv_start_wait_for_purge_to_start()
}
}
+/* JAN: TODO: */
+/**********************************************************************************/
+extern int timediff(struct timeval *g_time, struct timeval *s_time, struct timeval *d_time);
+extern ibool buf_flush_start(buf_pool_t* buf_pool, enum buf_flush flush_type);
+extern void buf_flush_end(buf_pool_t* buf_pool, enum buf_flush flush_type);
+extern void buf_flush_common(enum buf_flush flush_type, ulint page_count);
+extern ulint buf_flush_batch(buf_pool_t* buf_pool, enum buf_flush flush_type, ulint min_n, lsn_t lsn_limit);
+
+typedef enum wrk_status {
+ WRK_ITEM_SET=0,
+ WRK_ITEM_START=1,
+ WRK_ITEM_DONE=2,
+ WRK_ITEM_SUCCESS=2,
+ WRK_ITEM_FAILED=3,
+ WRK_ITEM_STATUS_UNDEFINED
+} wrk_status_t;
+
+typedef enum wthr_status {
+ WTHR_NOT_INIT=0,
+ WTHR_INITIALIZED=1,
+ WTHR_SIG_WAITING=2,
+ WTHR_RUNNING=3,
+ WTHR_NO_WORK=4,
+ WTHR_KILL_IT=5,
+ WTHR_STATUS_UNDEFINED
+} wthr_status_t;
+
+typedef struct wrk_itm
+{
+ /****************************/
+ /* Need to group into struct*/
+ buf_pool_t* buf_pool; //buffer-pool instance
+ int flush_type; //flush-type for buffer-pool flush operation
+ int min; //minimum number of pages requested to be flushed
+ unsigned long long lsn_limit; //lsn limit for the buffer-pool flush operation
+ /****************************/
+
+ unsigned long result; //flush pages count
+ unsigned long t_usec; //time-taken in usec
+ long id_usr; //thread-id currently working
+ wrk_status_t wi_status; //flag
+ struct wrk_itm *next;
+} wrk_t;
+
+typedef enum op_q_status {
+ Q_NOT_INIT=0,
+ Q_EMPTY=1,
+ Q_INITIALIZED=2,
+ Q_PROCESS=3,
+ Q_DONE=4,
+ Q_ERROR=5,
+ Q_STATUS_UNDEFINED
+} q_status_t;
+
+typedef struct op_queue
+{
+ pthread_mutex_t mtx;
+ pthread_cond_t cv;
+ q_status_t flag;
+ wrk_t *head;
+ wrk_t *tail;
+} opq_t;
+
+opq_t wq, cq;
+
+typedef struct thread_sync
+{
+ int wthread_id;
+ pthread_t wthread;
+ opq_t *wq;
+ opq_t *cq;
+ wthr_status_t wt_status;
+ unsigned long stat_universal_num_processed;
+ unsigned long stat_cycle_num_processed;
+} thread_sync_t;
+
+/* Global XXX:DD needs to be cleaned */
+int exit_flag;
+ulint check_wrk_done_count;
+static ulint done_cnt_flag;
+static int pgc_n_threads = 8;
+
+thread_sync_t pc_sync[PGCOMP_MAX_WORKER];
+static wrk_t work_items[PGCOMP_MAX_WORKER];
+static int pgcomp_wrk_initialized = -1;
+
+int set_check_done_flag_count(int cnt)
+{
+ return(check_wrk_done_count = cnt);
+}
+
+int set_pgcomp_wrk_init_done(void)
+{
+ pgcomp_wrk_initialized = 1;
+ return 0;
+}
+
+int is_pgcomp_wrk_init_done(void)
+{
+ return(pgcomp_wrk_initialized == 1);
+}
+
+ulint set_done_cnt_flag(ulint val)
+{
+ /*
+ * Assumption: The thread calling into set_done_cnt_flag
+ * needs to have "cq.mtx" acquired, else not safe.
+ */
+ done_cnt_flag = val;
+ return done_cnt_flag;
+}
+
+
+ulint cv_done_inc_flag_sig(thread_sync_t * ppc)
+{
+ pthread_mutex_lock(&ppc->cq->mtx);
+ ppc->stat_universal_num_processed++;
+ ppc->stat_cycle_num_processed++;
+ done_cnt_flag++;
+ if(!(done_cnt_flag <= check_wrk_done_count)) {
+ fprintf(stderr, "ERROR: done_cnt:%lu check_wrk_done_count:%lu\n",
+ done_cnt_flag, check_wrk_done_count);
+ }
+ assert(done_cnt_flag <= check_wrk_done_count);
+ pthread_mutex_unlock(&ppc->cq->mtx);
+ if(done_cnt_flag == check_wrk_done_count) {
+ ppc->wq->flag = Q_DONE;
+ pthread_mutex_lock(&ppc->cq->mtx);
+ ppc->cq->flag = Q_DONE;
+ pthread_cond_signal(&ppc->cq->cv);
+ pthread_mutex_unlock(&ppc->cq->mtx);
+ }
+ return(done_cnt_flag);
+}
+
+int q_remove_wrk(opq_t *q, wrk_t **wi)
+{
+ int ret = 0;
+
+ if(!wi || !q) {
+ return -1;
+ }
+
+ pthread_mutex_lock(&q->mtx);
+ assert(!((q->tail == NULL) && (q->head != NULL)));
+ assert(!((q->tail != NULL) && (q->head == NULL)));
+
+ /* get the first in the list*/
+ *wi = q->head;
+ if(q->head) {
+ ret = 0;
+ q->head = q->head->next;
+ (*wi)->next = NULL;
+ if(!q->head) {
+ q->tail = NULL;
+ }
+ } else {
+ q->tail = NULL;
+ ret = 1; /* indicating remove from queue failed */
+ }
+ pthread_mutex_unlock(&q->mtx);
+ return (ret);
+}
+
+int is_busy_wrk_itm(wrk_t *wi)
+{
+ if(!wi) {
+ return -1;
+ }
+ return(!(wi->id_usr == -1));
+}
+
+int setup_wrk_itm(int items)
+{
+ int i;
+ for(i=0; i<items; i++) {
+ work_items[i].buf_pool = NULL;
+ work_items[i].result = 0;
+ work_items[i].t_usec = 0;
+ work_items[i].id_usr = -1;
+ work_items[i].wi_status = WRK_ITEM_STATUS_UNDEFINED;
+ work_items[i].next = &work_items[(i+1)%items];
+ }
+ /* last node should be the tail */
+ work_items[items-1].next = NULL;
+ return 0;
+}
+
+int init_queue(opq_t *q)
+{
+ if(!q) {
+ return -1;
+ }
+ /* Initialize Queue mutex and CV */
+ pthread_mutex_init(&q->mtx, NULL);
+ pthread_cond_init(&q->cv, NULL);
+ q->flag = Q_INITIALIZED;
+ q->head = q->tail = NULL;
+
+ return 0;
+}
+
+#if 0
+int drain_cq(opq_t *cq, int items)
+{
+ int i=0;
+
+ if(!cq) {
+ return -1;
+ }
+ pthread_mutex_lock(&cq->mtx);
+ for(i=0; i<items; i++) {
+ work_items[i].result=0;
+ work_items[i].t_usec = 0;
+ work_items[i].id_usr = -1;
+ }
+ cq->head = cq->tail = NULL;
+ pthread_mutex_unlock(&cq->mtx);
+ return 0;
+}
+#endif
+
+int q_insert_wrk_list(opq_t *q, wrk_t *w_list)
+{
+ if((!q) || (!w_list)) {
+ fprintf(stderr, "insert failed q:%p w:%p\n", q, w_list);
+ return -1;
+ }
+
+ pthread_mutex_lock(&q->mtx);
+
+ assert(!((q->tail == NULL) && (q->head != NULL)));
+ assert(!((q->tail != NULL) && (q->head == NULL)));
+
+ /* list is empty */
+ if(!q->tail) {
+ q->head = q->tail = w_list;
+ } else {
+ /* added the first of the node to list */
+ assert(q->head != NULL);
+ q->tail->next = w_list;
+ }
+
+ /* move tail to the last node */
+ while(q->tail->next) {
+ q->tail = q->tail->next;
+ }
+ pthread_mutex_unlock(&q->mtx);
+
+ return 0;
+}
+
+int flush_pool_instance(wrk_t *wi)
+{
+ struct timeval p_start_time, p_end_time, d_time;
+
+ if(!wi) {
+ fprintf(stderr, "work item invalid wi:%p\n", wi);
+ return -1;
+ }
+
+ wi->t_usec = 0;
+ if (!buf_flush_start(wi->buf_pool, (buf_flush)wi->flush_type)) {
+ /* We have two choices here. If lsn_limit was
+ specified then skipping an instance of buffer
+ pool means we cannot guarantee that all pages
+ up to lsn_limit has been flushed. We can
+ return right now with failure or we can try
+ to flush remaining buffer pools up to the
+ lsn_limit. We attempt to flush other buffer
+ pools based on the assumption that it will
+ help in the retry which will follow the
+ failure. */
+ fprintf(stderr, "flush_start Failed, flush_type:%d\n",
+ (buf_flush)wi->flush_type);
+ return -1;
+ }
+
+#ifdef UNIV_DEBUG
+ /* Record time taken for the OP in usec */
+ gettimeofday(&p_start_time, 0x0);
+#endif
+
+ if((buf_flush)wi->flush_type == BUF_FLUSH_LRU) {
+ /* srv_LRU_scan_depth can be arbitrarily large value.
+ * We cap it with current LRU size.
+ */
+ buf_pool_mutex_enter(wi->buf_pool);
+ wi->min = UT_LIST_GET_LEN(wi->buf_pool->LRU);
+ buf_pool_mutex_exit(wi->buf_pool);
+ wi->min = ut_min(srv_LRU_scan_depth,wi->min);
+ }
+
+ wi->result = buf_flush_batch(wi->buf_pool,
+ (buf_flush)wi->flush_type,
+ wi->min, wi->lsn_limit);
+
+ buf_flush_end(wi->buf_pool, (buf_flush)wi->flush_type);
+ buf_flush_common((buf_flush)wi->flush_type, wi->result);
+
+#ifdef UNIV_DEBUG
+ gettimeofday(&p_end_time, 0x0);
+ timediff(&p_end_time, &p_start_time, &d_time);
+
+ wi->t_usec = (unsigned long)(d_time.tv_usec+(d_time.tv_sec*1000000));
+#endif
+
+ return 0;
+}
+
+int service_page_comp_io(thread_sync_t * ppc)
+{
+ wrk_t *wi = NULL;
+ int ret=0;
+
+ pthread_mutex_lock(&ppc->wq->mtx);
+ do{
+ ppc->wt_status = WTHR_SIG_WAITING;
+ ret = pthread_cond_wait(&ppc->wq->cv, &ppc->wq->mtx);
+ ppc->wt_status = WTHR_RUNNING;
+ if(ret == ETIMEDOUT) {
+ fprintf(stderr, "ERROR ETIMEDOUT cnt_flag:[%lu] ret:%d\n",
+ done_cnt_flag, ret);
+ } else if(ret == EINVAL || ret == EPERM) {
+ fprintf(stderr, "ERROR EINVAL/EPERM cnt_flag:[%lu] ret:%d\n",
+ done_cnt_flag, ret);
+ }
+ if(ppc->wq->flag == Q_PROCESS) {
+ break;
+ } else {
+ pthread_mutex_unlock(&ppc->wq->mtx);
+ return -1;
+ }
+ } while (ppc->wq->flag == Q_PROCESS && ret == 0);
+
+ pthread_mutex_unlock(&ppc->wq->mtx);
+
+ while (ppc->cq->flag == Q_PROCESS) {
+ wi = NULL;
+ /* Get the work item */
+ if (0 != (ret = q_remove_wrk(ppc->wq, &wi))) {
+ ppc->wt_status = WTHR_NO_WORK;
+ return -1;
+ }
+
+ assert(ret==0);
+ assert(wi != NULL);
+ assert(0 == is_busy_wrk_itm(wi));
+ assert(wi->id_usr == -1);
+
+ wi->id_usr = ppc->wthread;
+ wi->wi_status = WRK_ITEM_START;
+
+ /* Process work item */
+ if(0 != (ret = flush_pool_instance(wi))) {
+ fprintf(stderr, "FLUSH op failed ret:%d\n", ret);
+ wi->wi_status = WRK_ITEM_FAILED;
+ }
+
+ ret = q_insert_wrk_list(ppc->cq, wi);
+
+ assert(0==ret);
+ assert(check_wrk_done_count >= done_cnt_flag);
+ wi->wi_status = WRK_ITEM_SUCCESS;
+ if(check_wrk_done_count == cv_done_inc_flag_sig(ppc)) {
+ break;
+ }
+ }
+ return(0);
+}
+
+/******************************************************************//**
+@return a dummy parameter*/
+extern "C" UNIV_INTERN
+os_thread_ret_t
+DECLARE_THREAD(page_comp_io_thread)(
+/*==========================================*/
+ void * arg)
+{
+ thread_sync_t *ppc_io = ((thread_sync_t *)arg);
+
+ while (srv_shutdown_state != SRV_SHUTDOWN_EXIT_THREADS) {
+ service_page_comp_io(ppc_io);
+ ppc_io->stat_cycle_num_processed = 0;
+ }
+ os_thread_exit(NULL);
+ OS_THREAD_DUMMY_RETURN;
+}
+
+int print_queue_wrk_itm(opq_t *q)
+{
+#if UNIV_DEBUG
+ wrk_t *wi = NULL;
+
+ if(!q) {
+ fprintf(stderr, "queue NULL\n");
+ return -1;
+ }
+
+ if(!q->head || !q->tail) {
+ assert(!(((q->tail==NULL) && (q->head!=NULL)) && ((q->tail != NULL) && (q->head == NULL))));
+ fprintf(stderr, "queue empty (h:%p t:%p)\n", q->head, q->tail);
+ return 0;
+ }
+
+ pthread_mutex_lock(&q->mtx);
+ for(wi = q->head; (wi != NULL) ; wi = wi->next) {
+ //fprintf(stderr, "- [%p] %p %lu %luus [%ld] >%p\n",
+ // wi, wi->buf_pool, wi->result, wi->t_usec, wi->id_usr, wi->next);
+ fprintf(stderr, "- [%p] [%s] >%p\n",
+ wi, (wi->id_usr == -1)?"free":"Busy", wi->next);
+ }
+ pthread_mutex_unlock(&q->mtx);
+#endif
+ return(0);
+}
+
+int print_wrk_list(wrk_t *wi_list)
+{
+ wrk_t *wi = wi_list;
+ int i=0;
+
+ if(!wi_list) {
+ fprintf(stderr, "list NULL\n");
+ }
+
+ while(wi) {
+ fprintf(stderr, "-\t[%p]\t[%s]\t[%lu]\t[%luus] > %p\n",
+ wi, (wi->id_usr == -1)?"free":"Busy", wi->result, wi->t_usec, wi->next);
+ wi = wi->next;
+ i++;
+ }
+ fprintf(stderr, "list len: %d\n", i);
+ return 0;
+}
+
+int pgcomp_handler(wrk_t *w_list)
+{
+ int ret=0;
+ opq_t *wrk_q=NULL, *comp_q=NULL;
+
+ wrk_q=&wq;
+ comp_q=&cq;
+
+ pthread_mutex_lock(&wrk_q->mtx);
+ /* setup work queue here.. */
+ wrk_q->flag = Q_EMPTY;
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+ ret = q_insert_wrk_list(wrk_q, w_list);
+ if(ret != 0) {
+ fprintf(stderr, "%s():work-queue setup FAILED wq:%p w_list:%p \n",
+ __FUNCTION__, &wq, w_list);
+ return -1;
+ }
+
+retry_submit:
+ pthread_mutex_lock(&wrk_q->mtx);
+ /* setup work queue here.. */
+ wrk_q->flag = Q_INITIALIZED;
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+
+ pthread_mutex_lock(&comp_q->mtx);
+ if(0 != set_done_cnt_flag(0)) {
+ fprintf(stderr, "FAILED %s:%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&comp_q->mtx);
+ return -1;
+ }
+ comp_q->flag = Q_PROCESS;
+ pthread_mutex_unlock(&comp_q->mtx);
+
+ /* if threads are waiting request them to start */
+ pthread_mutex_lock(&wrk_q->mtx);
+ wrk_q->flag = Q_PROCESS;
+ pthread_cond_broadcast(&wrk_q->cv);
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+ /* Wait on all worker-threads to complete */
+ pthread_mutex_lock(&comp_q->mtx);
+ if (comp_q->flag != Q_DONE) {
+ do {
+ pthread_cond_wait(&comp_q->cv, &comp_q->mtx);
+ if(comp_q->flag != Q_DONE) {
+ fprintf(stderr, "[1] cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ if (done_cnt_flag != srv_buf_pool_instances) {
+ fprintf(stderr, "[2] cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ }
+ continue;
+ } else if (done_cnt_flag != srv_buf_pool_instances) {
+ fprintf(stderr, "[3]cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ comp_q->flag = Q_INITIALIZED;
+ pthread_mutex_unlock(&comp_q->mtx);
+ goto retry_submit;
+
+ assert(!done_cnt_flag);
+ continue;
+ }
+ assert(done_cnt_flag == srv_buf_pool_instances);
+
+ if ((comp_q->flag == Q_DONE) &&
+ (done_cnt_flag == srv_buf_pool_instances)) {
+ break;
+ }
+ } while((comp_q->flag == Q_INITIALIZED) &&
+ (done_cnt_flag != srv_buf_pool_instances));
+ } else {
+ fprintf(stderr, "[4] cv wait on CQ failed flag:%d cnt:%lu\n",
+ comp_q->flag, done_cnt_flag);
+ if (!done_cnt_flag) {
+ fprintf(stderr, "============\n");
+ print_wrk_list(w_list);
+ fprintf(stderr, "============\n");
+ comp_q->flag = Q_INITIALIZED;
+ pthread_mutex_unlock(&comp_q->mtx);
+ goto retry_submit;
+ assert(!done_cnt_flag);
+ }
+ assert(done_cnt_flag == srv_buf_pool_instances);
+ }
+
+ pthread_mutex_unlock(&comp_q->mtx);
+ pthread_mutex_lock(&wrk_q->mtx);
+ wrk_q->flag = Q_DONE;
+ pthread_mutex_unlock(&wrk_q->mtx);
+
+ return 0;
+}
+
+/******************************************************************//**
+@return a dummy parameter*/
+int pgcomp_handler_init(int num_threads, int wrk_cnt, opq_t *wq, opq_t *cq)
+{
+ int i=0;
+
+ if(is_pgcomp_wrk_init_done()) {
+ fprintf(stderr, "pgcomp_handler_init(): ERROR already initialized\n");
+ return -1;
+ }
+
+ if(!wq || !cq) {
+ fprintf(stderr, "%s() FAILED wq:%p cq:%p\n", __FUNCTION__, wq, cq);
+ return -1;
+ }
+
+ /* work-item setup */
+ setup_wrk_itm(wrk_cnt);
+
+ /* wq & cq setup */
+ init_queue(wq);
+ init_queue(cq);
+
+ /* Mark each of the thread sync entires */
+ for(i=0; i < PGCOMP_MAX_WORKER; i++) {
+ pc_sync[i].wthread_id = i;
+ }
+
+ /* Create threads for page-compression-flush */
+ for(i=0; i < num_threads; i++) {
+ pc_sync[i].wthread_id = i;
+ pc_sync[i].wq = wq;
+ pc_sync[i].cq = cq;
+ os_thread_create(page_comp_io_thread, ((void *)(pc_sync + i)),
+ thread_ids + START_PGCOMP_CNT + i);
+ //pc_sync[i].wthread = thread_ids[START_PGCOMP_CNT + i];
+ pc_sync[i].wthread = (START_PGCOMP_CNT + i);
+ pc_sync[i].wt_status = WTHR_INITIALIZED;
+ }
+
+ set_check_done_flag_count(wrk_cnt);
+ set_pgcomp_wrk_init_done();
+
+ return 0;
+}
+
+
+int wrk_thread_stat(thread_sync_t *wthr, unsigned int num_threads)
+{
+ long stat_tot=0;
+ unsigned int i=0;
+ for(i=0; i< num_threads;i++) {
+ stat_tot+=wthr[i].stat_universal_num_processed;
+ fprintf(stderr, "[%d] stat [%lu]\n", wthr[i].wthread_id,
+ wthr[i].stat_universal_num_processed);
+ }
+ fprintf(stderr, "Stat-Total:%lu\n", stat_tot);
+ return (0);
+}
+
+int reset_wrk_itm(int items)
+{
+ int i;
+
+ pthread_mutex_lock(&wq.mtx);
+ wq.head = wq.tail = NULL;
+ pthread_mutex_unlock(&wq.mtx);
+
+ pthread_mutex_lock(&cq.mtx);
+ for(i=0;i<items; i++) {
+ work_items[i].id_usr = -1;
+ }
+ cq.head = cq.tail = NULL;
+ pthread_mutex_unlock(&cq.mtx);
+ return 0;
+}
+
+int pgcomp_flush_work_items(int buf_pool_inst, int *per_pool_pages_flushed,
+ int flush_type, int min_n, unsigned long long lsn_limit)
+{
+ int ret=0, i=0;
+
+ pthread_mutex_lock(&wq.mtx);
+ pthread_mutex_lock(&cq.mtx);
+
+ assert(wq.head == NULL);
+ assert(wq.tail == NULL);
+ if(cq.head) {
+ print_wrk_list(cq.head);
+ }
+ assert(cq.head == NULL);
+ assert(cq.tail == NULL);
+
+ for(i=0;i<buf_pool_inst; i++) {
+ work_items[i].buf_pool = buf_pool_from_array(i);
+ work_items[i].flush_type = flush_type;
+ work_items[i].min = min_n;
+ work_items[i].lsn_limit = lsn_limit;
+ work_items[i].id_usr = -1;
+ work_items[i].next = &work_items[(i+1)%buf_pool_inst];
+ work_items[i].wi_status = WRK_ITEM_SET;
+ }
+ work_items[i-1].next=NULL;
+
+ pthread_mutex_unlock(&cq.mtx);
+ pthread_mutex_unlock(&wq.mtx);
+
+ pgcomp_handler(work_items);
+
+ pthread_mutex_lock(&wq.mtx);
+ pthread_mutex_lock(&cq.mtx);
+ /* collect data/results total pages flushed */
+ for(i=0; i<buf_pool_inst; i++) {
+ if(work_items[i].result == -1) {
+ ret = -1;
+ per_pool_pages_flushed[i] = 0;
+ } else {
+ per_pool_pages_flushed[i] = work_items[i].result;
+ }
+ if((work_items[i].id_usr == -1) && (work_items[i].wi_status == WRK_ITEM_SET )) {
+ fprintf(stderr, "**Set/Unused work_item[%d] flush_type=%d\n", i, work_items[i].flush_type);
+ assert(0);
+ }
+ }
+
+ wq.flag = cq.flag = Q_INITIALIZED;
+
+ pthread_mutex_unlock(&cq.mtx);
+ pthread_mutex_unlock(&wq.mtx);
+
+#if UNIV_DEBUG
+ /* Print work-list stats */
+ fprintf(stderr, "==wq== [DONE]\n");
+ print_wrk_list(wq.head);
+ fprintf(stderr, "==cq== [DONE]\n");
+ print_wrk_list(cq.head);
+ fprintf(stderr, "==worker-thread-stats==\n");
+ wrk_thread_stat(pc_sync, pgc_n_threads);
+#endif
+
+ /* clear up work-queue for next flush */
+ reset_wrk_itm(buf_pool_inst);
+ return(ret);
+}
+
+/* JAN: TODO: END: */
+
/********************************************************************
Starts InnoDB and creates a new database if database files
are not found and the user wants.
@@ -2585,6 +3275,16 @@ files_checked:
}
if (!srv_read_only_mode) {
+ /* JAN: TODO: */
+ if (srv_buf_pool_instances <= PGCOMP_MAX_WORKER) {
+ pgc_n_threads = srv_buf_pool_instances;
+ }
+ /* else we default to 8 worker-threads */
+ pgcomp_handler_init(pgc_n_threads, srv_buf_pool_instances, &wq, &cq);
+ /* JAN: TODO: END */
+#if UNIV_DEBUG
+ fprintf(stderr, "%s:%d buf-pool-instances:%lu\n", __FILE__, __LINE__, srv_buf_pool_instances);
+#endif
os_thread_create(buf_flush_page_cleaner_thread, NULL, NULL);
}