summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0import.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0import.cc')
-rw-r--r--storage/innobase/row/row0import.cc941
1 files changed, 701 insertions, 240 deletions
diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc
index 020a814c4eb..e1554949127 100644
--- a/storage/innobase/row/row0import.cc
+++ b/storage/innobase/row/row0import.cc
@@ -13,7 +13,7 @@ FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
-51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*****************************************************************************/
@@ -31,6 +31,7 @@ Created 2012-02-08 by Sunny Bains.
#endif
#include "btr0pcur.h"
+#include "btr0sea.h"
#include "que0que.h"
#include "dict0boot.h"
#include "ibuf0ibuf.h"
@@ -40,12 +41,17 @@ Created 2012-02-08 by Sunny Bains.
#include "row0mysql.h"
#include "srv0start.h"
#include "row0quiesce.h"
+#include "fil0pagecompress.h"
+#ifdef HAVE_LZO
+#include "lzo/lzo1x.h"
+#endif
+#ifdef HAVE_SNAPPY
+#include "snappy-c.h"
+#endif
#include <vector>
-/** The size of the buffer to use for IO. Note: os_file_read() doesn't expect
-reads to fail. If you set the buffer size to be greater than a multiple of the
-file size then it will assert. TODO: Fix this limitation of the IO functions.
+/** The size of the buffer to use for IO.
@param n - page size of the tablespace.
@retval number of pages */
#define IO_BUFFER_SIZE(n) ((1024 * 1024) / n)
@@ -105,18 +111,18 @@ struct row_index_t {
struct row_import {
row_import() UNIV_NOTHROW
:
- m_table(),
- m_version(),
- m_hostname(),
- m_table_name(),
- m_autoinc(),
- m_page_size(),
- m_flags(),
- m_n_cols(),
- m_cols(),
- m_col_names(),
- m_n_indexes(),
- m_indexes(),
+ m_table(NULL),
+ m_version(0),
+ m_hostname(NULL),
+ m_table_name(NULL),
+ m_autoinc(0),
+ m_page_size(0),
+ m_flags(0),
+ m_n_cols(0),
+ m_cols(NULL),
+ m_col_names(NULL),
+ m_n_indexes(0),
+ m_indexes(NULL),
m_missing(true) { }
~row_import() UNIV_NOTHROW;
@@ -361,7 +367,8 @@ private:
/** Functor that is called for each physical page that is read from the
tablespace file. */
-class AbstractCallback : public PageCallback {
+class AbstractCallback
+{
public:
/** Constructor
@param trx - covering transaction */
@@ -394,32 +401,59 @@ public:
return(get_zip_size() > 0);
}
-protected:
/**
- Get the data page depending on the table type, compressed or not.
- @param block - block read from disk
- @retval the buffer frame */
- buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW
+ Set the name of the physical file and the file handle that is used
+ to open it for the file that is being iterated over.
+ @param filename - then physical name of the tablespace file.
+ @param file - OS file handle */
+ void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW
{
- if (is_compressed_table()) {
- return(block->page.zip.data);
- }
+ m_file = file;
+ m_filepath = filename;
+ }
- return(buf_block_get_frame(block));
+ /** The compressed page size
+ @return the compressed page size */
+ ulint get_zip_size() const
+ {
+ return(m_zip_size);
}
- /** Check for session interrupt. If required we could
- even flush to disk here every N pages.
- @retval DB_SUCCESS or error code */
- dberr_t periodic_check() UNIV_NOTHROW
+ /** The compressed page size
+ @return the compressed page size */
+ ulint get_page_size() const
{
- if (trx_is_interrupted(m_trx)) {
- return(DB_INTERRUPTED);
- }
+ return(m_page_size);
+ }
- return(DB_SUCCESS);
+ const char* filename() const { return m_filepath; }
+
+ /**
+ Called for every page in the tablespace. If the page was not
+ updated then its state must be set to BUF_PAGE_NOT_USED. For
+ compressed tables the page descriptor memory will be at offset:
+ block->frame + UNIV_PAGE_SIZE;
+ @param block block read from file, note it is not from the buffer pool
+ @retval DB_SUCCESS or error code. */
+ virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
+
+ /**
+ @return the space id of the tablespace */
+ virtual ulint get_space_id() const UNIV_NOTHROW = 0;
+
+ bool is_interrupted() const { return trx_is_interrupted(m_trx); }
+
+ /**
+ Get the data page depending on the table type, compressed or not.
+ @param block - block read from disk
+ @retval the buffer frame */
+ static byte* get_frame(const buf_block_t* block)
+ {
+ return block->page.zip.data
+ ? block->page.zip.data : block->frame;
}
+protected:
/**
Get the physical offset of the extent descriptor within the page.
@param page_no - page number of the extent descriptor
@@ -509,6 +543,18 @@ protected:
}
protected:
+ /** Compressed table page size */
+ ulint m_zip_size;
+
+ /** The tablespace page size. */
+ ulint m_page_size;
+
+ /** File handle to the tablespace */
+ pfs_os_file_t m_file;
+
+ /** Physical file path. */
+ const char* m_filepath;
+
/** Covering transaction. */
trx_t* m_trx;
@@ -565,9 +611,9 @@ AbstractCallback::init(
/* Since we don't know whether it is a compressed table
or not, the data is always read into the block->frame. */
- dberr_t err = set_zip_size(block->frame);
+ m_zip_size = fsp_header_get_zip_size(page);
- if (err != DB_SUCCESS) {
+ if (!ut_is_2pow(m_zip_size) || m_zip_size > UNIV_ZIP_SIZE_MAX) {
return(DB_CORRUPTION);
}
@@ -604,11 +650,7 @@ AbstractCallback::init(
m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT);
m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID);
- if ((err = set_current_xdes(0, page)) != DB_SUCCESS) {
- return(err);
- }
-
- return(DB_SUCCESS);
+ return set_current_xdes(0, page);
}
/**
@@ -650,12 +692,9 @@ struct FetchIndexRootPages : public AbstractCallback {
/**
Called for each block as it is read from the file.
- @param offset - physical offset in the file
- @param block - block to convert, it is not from the buffer pool.
+ @param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
- virtual dberr_t operator() (
- os_offset_t offset,
- buf_block_t* block) UNIV_NOTHROW;
+ dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
/** Update the import configuration that will be used to import
the tablespace. */
@@ -673,34 +712,18 @@ Called for each block as it is read from the file. Check index pages to
determine the exact row format. We can't get that from the tablespace
header flags alone.
-@param offset - physical offset in the file
-@param block - block to convert, it is not from the buffer pool.
+@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
-dberr_t
-FetchIndexRootPages::operator() (
- os_offset_t offset,
- buf_block_t* block) UNIV_NOTHROW
+dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
{
- dberr_t err;
-
- if ((err = periodic_check()) != DB_SUCCESS) {
- return(err);
- }
+ if (is_interrupted()) return DB_INTERRUPTED;
const page_t* page = get_frame(block);
ulint page_type = fil_page_get_type(page);
- if (block->page.offset * m_page_size != offset) {
- ib_logf(IB_LOG_LEVEL_ERROR,
- "Page offset doesn't match file offset: "
- "page offset: %u, file offset: " ULINTPF,
- block->page.offset,
- (ulint) (offset / m_page_size));
-
- err = DB_CORRUPTION;
- } else if (page_type == FIL_PAGE_TYPE_XDES) {
- err = set_current_xdes(block->page.offset, page);
+ if (page_type == FIL_PAGE_TYPE_XDES) {
+ return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
&& is_root_page(page)) {
@@ -725,7 +748,7 @@ FetchIndexRootPages::operator() (
}
}
- return(err);
+ return DB_SUCCESS;
}
/**
@@ -842,21 +865,10 @@ public:
/**
Called for each block as it is read from the file.
- @param offset - physical offset in the file
- @param block - block to convert, it is not from the buffer pool.
+ @param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
- virtual dberr_t operator() (
- os_offset_t offset,
- buf_block_t* block) UNIV_NOTHROW;
+ dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
private:
-
- /** Status returned by PageConverter::validate() */
- enum import_page_status_t {
- IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
- IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
- IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
- };
-
/**
Update the page, set the space id, max trx id and index id.
@param block - block read from file
@@ -866,17 +878,6 @@ private:
buf_block_t* block,
ulint& page_type) UNIV_NOTHROW;
-#if defined UNIV_DEBUG
- /**
- @return true error condition is enabled. */
- bool trigger_corruption() UNIV_NOTHROW
- {
- return(false);
- }
- #else
-#define trigger_corruption() (false)
-#endif /* UNIV_DEBUG */
-
/**
Update the space, index id, trx id.
@param block - block to convert
@@ -890,15 +891,6 @@ private:
dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
/**
- Validate the page, check for corruption.
- @param offset - physical offset within file.
- @param page - page read from file.
- @return 0 on success, 1 if all zero, 2 if corrupted */
- import_page_status_t validate(
- os_offset_t offset,
- buf_block_t* page) UNIV_NOTHROW;
-
- /**
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
@retval DB_SUCCESS or error code */
@@ -1306,17 +1298,63 @@ row_import::match_schema(
{
/* Do some simple checks. */
- if ((m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR) {
+ if (ulint mismatch = (m_table->flags ^ m_flags)
+ & ~DICT_TF_MASK_DATA_DIR) {
+ const char* msg;
+ if (mismatch & DICT_TF_MASK_ZIP_SSIZE) {
+ if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE)
+ && (m_flags & DICT_TF_MASK_ZIP_SSIZE)) {
+ switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
+ case 0U << DICT_TF_POS_ZIP_SSIZE:
+ goto uncompressed;
+ case 1U << DICT_TF_POS_ZIP_SSIZE:
+ msg = "ROW_FORMAT=COMPRESSED"
+ " KEY_BLOCK_SIZE=1";
+ break;
+ case 2U << DICT_TF_POS_ZIP_SSIZE:
+ msg = "ROW_FORMAT=COMPRESSED"
+ " KEY_BLOCK_SIZE=2";
+ break;
+ case 3U << DICT_TF_POS_ZIP_SSIZE:
+ msg = "ROW_FORMAT=COMPRESSED"
+ " KEY_BLOCK_SIZE=4";
+ break;
+ case 4U << DICT_TF_POS_ZIP_SSIZE:
+ msg = "ROW_FORMAT=COMPRESSED"
+ " KEY_BLOCK_SIZE=8";
+ break;
+ case 5U << DICT_TF_POS_ZIP_SSIZE:
+ msg = "ROW_FORMAT=COMPRESSED"
+ " KEY_BLOCK_SIZE=16";
+ break;
+ default:
+ msg = "strange KEY_BLOCK_SIZE";
+ }
+ } else if (m_flags & DICT_TF_MASK_ZIP_SSIZE) {
+ msg = "ROW_FORMAT=COMPRESSED";
+ } else {
+ goto uncompressed;
+ }
+ } else {
+uncompressed:
+ msg = (m_flags & DICT_TF_MASK_ATOMIC_BLOBS)
+ ? "ROW_FORMAT=DYNAMIC"
+ : (m_flags & DICT_TF_MASK_COMPACT)
+ ? "ROW_FORMAT=COMPACT"
+ : "ROW_FORMAT=REDUNDANT";
+ }
+
ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
"Table flags don't match, server table has 0x%x"
- " and the meta-data file has 0x%lx",
- m_table->flags, ulong(m_flags));
+ " and the meta-data file has 0x%lx;"
+ " .cfg file uses %s",
+ m_table->flags, ulong(m_flags), msg);
return(DB_ERROR);
} else if (m_table->n_cols != m_n_cols) {
ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH,
- "Number of columns don't match, table has %u"
- " columns but the tablespace meta-data file has "
+ "Number of columns don't match, table has %u "
+ "columns but the tablespace meta-data file has "
ULINTPF " columns",
m_table->n_cols, m_n_cols);
@@ -1596,6 +1634,7 @@ IndexPurge::purge() UNIV_NOTHROW
Constructor
* @param cfg - config of table being imported.
* @param trx - transaction covering the import */
+inline
PageConverter::PageConverter(
row_import* cfg,
trx_t* trx)
@@ -1620,6 +1659,7 @@ Adjust the BLOB reference for a single column that is externally stored
@param offsets - column offsets for the record
@param i - column ordinal value
@return DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::adjust_cluster_index_blob_column(
rec_t* rec,
@@ -1672,6 +1712,7 @@ stored columns.
@param rec - record to update
@param offsets - column offsets for the record
@return DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::adjust_cluster_index_blob_columns(
rec_t* rec,
@@ -1705,6 +1746,7 @@ BLOB reference, write the new space id.
@param rec - record to update
@param offsets - column offsets for the record
@return DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::adjust_cluster_index_blob_ref(
rec_t* rec,
@@ -1728,6 +1770,7 @@ Purge delete-marked records, only if it is possible to do so without
re-organising the B+tree.
@param offsets - current row offsets.
@return true if purge succeeded */
+inline
bool
PageConverter::purge(const ulint* offsets) UNIV_NOTHROW
{
@@ -1752,6 +1795,7 @@ Adjust the BLOB references and sys fields for the current record.
@param offsets - column offsets for the record
@param deleted - true if row is delete marked
@return DB_SUCCESS or error code. */
+inline
dberr_t
PageConverter::adjust_cluster_record(
const dict_index_t* index,
@@ -1780,6 +1824,7 @@ Update the BLOB refrences and write UNDO log entries for
rows that can't be purged optimistically.
@param block - block to update
@retval DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::update_records(
buf_block_t* block) UNIV_NOTHROW
@@ -1791,10 +1836,6 @@ PageConverter::update_records(
m_rec_iter.open(block);
- if (!page_is_leaf(block->frame)) {
- return DB_SUCCESS;
- }
-
while (!m_rec_iter.end()) {
rec_t* rec = m_rec_iter.current();
ibool deleted = rec_get_deleted_flag(rec, comp);
@@ -1845,6 +1886,7 @@ PageConverter::update_records(
/**
Update the space, index id, trx id.
@return DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::update_index_page(
buf_block_t* block) UNIV_NOTHROW
@@ -1907,13 +1949,14 @@ PageConverter::update_index_page(
return(DB_SUCCESS);
}
- return(update_records(block));
+ return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS;
}
/**
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
@retval DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::update_header(
buf_block_t* block) UNIV_NOTHROW
@@ -1953,6 +1996,7 @@ PageConverter::update_header(
Update the page, set the space id, max trx id and index id.
@param block - block read from file
@retval DB_SUCCESS or error code */
+inline
dberr_t
PageConverter::update_page(
buf_block_t* block,
@@ -1960,6 +2004,14 @@ PageConverter::update_page(
{
dberr_t err = DB_SUCCESS;
+ ut_ad(!block->page.zip.data == !is_compressed_table());
+
+ if (block->page.zip.data) {
+ m_page_zip_ptr = &block->page.zip;
+ } else {
+ ut_ad(!m_page_zip_ptr);
+ }
+
switch (page_type = fil_page_get_type(get_frame(block))) {
case FIL_PAGE_TYPE_FSP_HDR:
/* Work directly on the uncompressed page headers. */
@@ -2015,140 +2067,44 @@ PageConverter::update_page(
}
/**
-Validate the page
-@param offset - physical offset within file.
-@param page - page read from file.
-@return status */
-PageConverter::import_page_status_t
-PageConverter::validate(
- os_offset_t offset,
- buf_block_t* block) UNIV_NOTHROW
-{
- buf_frame_t* page = get_frame(block);
-
- /* Check that the page number corresponds to the offset in
- the file. Flag as corrupt if it doesn't. Disable the check
- for LSN in buf_page_is_corrupted() */
-
- if (buf_page_is_corrupted(false, page, get_zip_size(), NULL)
- || (page_get_page_no(page) != offset / m_page_size
- && page_get_page_no(page) != 0)) {
-
- return(IMPORT_PAGE_STATUS_CORRUPTED);
-
- } else if (offset > 0 && page_get_page_no(page) == 0) {
- ulint checksum;
-
- checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
- if (checksum != 0) {
- /* Checksum check passed in buf_page_is_corrupted(). */
- ib_logf(IB_LOG_LEVEL_WARN,
- "%s: Page %lu checksum " ULINTPF
- " should be zero.",
- m_filepath, (ulong) (offset / m_page_size),
- checksum);
- }
-
- const byte* b = page + FIL_PAGE_OFFSET;
- const byte* e = page + m_page_size
- - FIL_PAGE_END_LSN_OLD_CHKSUM;
-
- /* If the page number is zero and offset > 0 then
- the entire page MUST consist of zeroes. If not then
- we flag it as corrupt. */
-
- while (b != e) {
-
- if (*b++ && !trigger_corruption()) {
- return(IMPORT_PAGE_STATUS_CORRUPTED);
- }
- }
-
- /* The page is all zero: do nothing. */
- return(IMPORT_PAGE_STATUS_ALL_ZERO);
- }
-
- return(IMPORT_PAGE_STATUS_OK);
-}
-
-/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
-@param offset - physical offset within the file
-@param block - block read from file, note it is not from the buffer pool
+@param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
-dberr_t
-PageConverter::operator() (
- os_offset_t offset,
- buf_block_t* block) UNIV_NOTHROW
+dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
{
- ulint page_type;
- dberr_t err = DB_SUCCESS;
-
- if ((err = periodic_check()) != DB_SUCCESS) {
- return(err);
- }
-
- if (is_compressed_table()) {
- m_page_zip_ptr = &block->page.zip;
- } else {
- ut_ad(m_page_zip_ptr == 0);
- }
-
- switch(validate(offset, block)) {
- case IMPORT_PAGE_STATUS_OK:
-
- /* We have to decompress the compressed pages before
- we can work on them */
-
- if ((err = update_page(block, page_type)) != DB_SUCCESS) {
- break;
- }
-
- /* Note: For compressed pages this function will write to the
- zip descriptor and for uncompressed pages it will write to
- page (ie. the block->frame). Therefore the caller should write
- out the descriptor contents and not block->frame for compressed
- pages. */
-
- if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) {
-
- buf_flush_init_for_writing(
- !is_compressed_table()
- ? block->frame : block->page.zip.data,
- !is_compressed_table() ? 0 : m_page_zip_ptr,
- m_current_lsn);
- } else {
- /* Calculate and update the checksum of non-btree
- pages for compressed tables explicitly here. */
-
- buf_flush_update_zip_checksum(
- get_frame(block), get_zip_size(),
- m_current_lsn);
- }
-
- break;
+ /* If we already had an old page with matching number
+ in the buffer pool, evict it now, because
+ we no longer evict the pages on DISCARD TABLESPACE. */
+ buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
+ RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
+ __FILE__, __LINE__, NULL);
- case IMPORT_PAGE_STATUS_ALL_ZERO:
- /* The page is all zero: leave it as is. */
- break;
+ ulint page_type;
- case IMPORT_PAGE_STATUS_CORRUPTED:
+ dberr_t err = update_page(block, page_type);
+ if (err != DB_SUCCESS) return err;
- ib_logf(IB_LOG_LEVEL_WARN,
- "%s: Page %lu at offset " UINT64PF " looks corrupted.",
- m_filepath, (ulong) (offset / m_page_size), offset);
+ /* Note: For compressed pages this function will write to the
+ zip descriptor and for uncompressed pages it will write to
+ page (ie. the block->frame). Therefore the caller should write
+ out the descriptor contents and not block->frame for compressed
+ pages. */
- err = DB_CORRUPTION;
+ if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) {
+ buf_flush_init_for_writing(
+ get_frame(block),
+ block->page.zip.data ? &block->page.zip : NULL,
+ m_current_lsn);
+ } else {
+ /* Calculate and update the checksum of non-btree
+ pages for compressed tables explicitly here. */
+ buf_flush_update_zip_checksum(
+ get_frame(block), get_zip_size(),
+ m_current_lsn);
}
- /* If we already had and old page with matching number
- in the buffer pool, evict it now, because
- we no longer evict the pages on DISCARD TABLESPACE. */
- buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
- RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
- __FILE__, __LINE__, NULL);
- return(err);
+ return DB_SUCCESS;
}
/*****************************************************************//**
@@ -2558,8 +2514,6 @@ row_import_cfg_read_index_fields(
dict_field_t* field = index->m_fields;
- memset(field, 0x0, sizeof(*field) * n_fields);
-
for (ulint i = 0; i < n_fields; ++i, ++field) {
byte* ptr = row;
@@ -2577,6 +2531,8 @@ row_import_cfg_read_index_fields(
return(DB_IO_ERROR);
}
+ new (field) dict_field_t();
+
field->prefix_len = mach_read_from_4(ptr);
ptr += sizeof(ib_uint32_t);
@@ -3423,6 +3379,497 @@ row_import_update_discarded_flag(
return(err);
}
+struct fil_iterator_t {
+ pfs_os_file_t file; /*!< File handle */
+ const char* filepath; /*!< File path name */
+ os_offset_t start; /*!< From where to start */
+ os_offset_t end; /*!< Where to stop */
+ os_offset_t file_size; /*!< File size in bytes */
+ ulint page_size; /*!< Page size */
+ ulint n_io_buffers; /*!< Number of pages to use
+ for IO */
+ byte* io_buffer; /*!< Buffer to use for IO */
+ fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */
+ byte* crypt_io_buffer; /*!< IO buffer when encrypted */
+};
+
+/********************************************************************//**
+TODO: This can be made parallel trivially by chunking up the file and creating
+a callback per thread. . Main benefit will be to use multiple CPUs for
+checksums and compressed tables. We have to do compressed tables block by
+block right now. Secondly we need to decompress/compress and copy too much
+of data. These are CPU intensive.
+
+Iterate over all the pages in the tablespace.
+@param iter - Tablespace iterator
+@param block - block to use for IO
+@param callback - Callback to inspect and update page contents
+@retval DB_SUCCESS or error code */
+static
+dberr_t
+fil_iterate(
+/*========*/
+ const fil_iterator_t& iter,
+ buf_block_t* block,
+ AbstractCallback& callback)
+{
+ os_offset_t offset;
+ ulint n_bytes = iter.n_io_buffers * iter.page_size;
+
+ const ulint buf_size = srv_page_size
+#ifdef HAVE_LZO
+ + LZO1X_1_15_MEM_COMPRESS
+#elif defined HAVE_SNAPPY
+ + snappy_max_compressed_length(srv_page_size)
+#endif
+ ;
+ byte* page_compress_buf = static_cast<byte*>(
+ ut_malloc_low(buf_size, false));
+ ut_ad(!srv_read_only_mode);
+
+ if (!page_compress_buf) {
+ return DB_OUT_OF_MEMORY;
+ }
+
+ /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
+ copying for non-index pages. Unfortunately, it is
+ required by buf_zip_decompress() */
+ dberr_t err = DB_SUCCESS;
+
+ for (offset = iter.start; offset < iter.end; offset += n_bytes) {
+ if (callback.is_interrupted()) {
+ err = DB_INTERRUPTED;
+ goto func_exit;
+ }
+
+ byte* io_buffer = iter.io_buffer;
+ block->frame = io_buffer;
+
+ if (block->page.zip.data) {
+ /* Zip IO is done in the compressed page buffer. */
+ io_buffer = block->page.zip.data;
+ ut_ad(PAGE_ZIP_MATCH(block->frame, &block->page.zip));
+ }
+
+ /* We have to read the exact number of bytes. Otherwise the
+ InnoDB IO functions croak on failed reads. */
+
+ n_bytes = ulint(ut_min(os_offset_t(n_bytes),
+ iter.end - offset));
+
+ ut_ad(n_bytes > 0);
+ ut_ad(!(n_bytes % iter.page_size));
+
+ const bool encrypted = iter.crypt_data != NULL
+ && iter.crypt_data->should_encrypt();
+ /* Use additional crypt io buffer if tablespace is encrypted */
+ byte* const readptr = encrypted
+ ? iter.crypt_io_buffer : io_buffer;
+ byte* const writeptr = readptr;
+
+ if (!os_file_read_no_error_handling(iter.file, readptr,
+ offset, n_bytes)) {
+ ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed");
+ err = DB_IO_ERROR;
+ goto func_exit;
+ }
+
+ bool updated = false;
+ const ulint size = iter.page_size;
+ ulint n_pages_read = ulint(n_bytes) / size;
+ block->page.offset = offset / size;
+
+ for (ulint i = 0; i < n_pages_read;
+ ++i, block->frame += size, block->page.offset++) {
+ byte* src = readptr + (i * size);
+ const ulint page_no = page_get_page_no(src);
+ if (!page_no && block->page.offset) {
+ const ulint* b = reinterpret_cast<const ulint*>
+ (src);
+ const ulint* const e = b + size / sizeof *b;
+ do {
+ if (*b++) {
+ goto page_corrupted;
+ }
+ } while (b != e);
+
+ /* Proceed to the next page,
+ because this one is all zero. */
+ continue;
+ }
+
+ if (page_no != block->page.offset) {
+page_corrupted:
+ ib_logf(IB_LOG_LEVEL_WARN,
+ "%s: Page %lu at offset "
+ UINT64PF " looks corrupted.",
+ callback.filename(),
+ ulong(offset / size), offset);
+ err = DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ bool decrypted = false;
+ byte* dst = io_buffer + (i * size);
+ bool frame_changed = false;
+ ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
+ const bool page_compressed
+ = page_type
+ == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
+ || page_type == FIL_PAGE_PAGE_COMPRESSED;
+
+ if (page_compressed && block->page.zip.data) {
+ goto page_corrupted;
+ }
+
+ if (!encrypted) {
+ } else if (!mach_read_from_4(
+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ + src)) {
+not_encrypted:
+ if (!page_compressed
+ && !block->page.zip.data) {
+ block->frame = src;
+ frame_changed = true;
+ } else {
+ ut_ad(dst != src);
+ memcpy(dst, src, size);
+ }
+ } else {
+ if (!fil_space_verify_crypt_checksum(
+ src, callback.get_zip_size())) {
+ goto page_corrupted;
+ }
+
+ decrypted = fil_space_decrypt(
+ iter.crypt_data, dst,
+ iter.page_size, src, &err);
+
+ if (err != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ if (!decrypted) {
+ goto not_encrypted;
+ }
+
+ updated = true;
+ }
+
+ /* If the original page is page_compressed, we need
+ to decompress it before adjusting further. */
+ if (page_compressed) {
+ ulint compress_length = fil_page_decompress(
+ page_compress_buf, dst);
+ ut_ad(compress_length != srv_page_size);
+ if (compress_length == 0) {
+ goto page_corrupted;
+ }
+ updated = true;
+ } else if (buf_page_is_corrupted(
+ false,
+ encrypted && !frame_changed
+ ? dst : src,
+ callback.get_zip_size(), NULL)) {
+ goto page_corrupted;
+ }
+
+ if ((err = callback(block)) != DB_SUCCESS) {
+ goto func_exit;
+ } else if (!updated) {
+ updated = buf_block_get_state(block)
+ == BUF_BLOCK_FILE_PAGE;
+ }
+
+ /* If tablespace is encrypted we use additional
+ temporary scratch area where pages are read
+ for decrypting readptr == crypt_io_buffer != io_buffer.
+
+ Destination for decryption is a buffer pool block
+ block->frame == dst == io_buffer that is updated.
+ Pages that did not require decryption even when
+ tablespace is marked as encrypted are not copied
+ instead block->frame is set to src == readptr.
+
+ For encryption we again use temporary scratch area
+ writeptr != io_buffer == dst
+ that is then written to the tablespace
+
+ (1) For normal tables io_buffer == dst == writeptr
+ (2) For only page compressed tables
+ io_buffer == dst == writeptr
+ (3) For encrypted (and page compressed)
+ readptr != io_buffer == dst != writeptr
+ */
+
+ ut_ad(!encrypted && !page_compressed ?
+ src == dst && dst == writeptr + (i * size):1);
+ ut_ad(page_compressed && !encrypted ?
+ src == dst && dst == writeptr + (i * size):1);
+ ut_ad(encrypted ?
+ src != dst && dst != writeptr + (i * size):1);
+
+ /* When tablespace is encrypted or compressed its
+ first page (i.e. page 0) is not encrypted or
+ compressed and there is no need to copy frame. */
+ if (encrypted && block->page.offset != 0) {
+ byte *local_frame = callback.get_frame(block);
+ ut_ad((writeptr + (i * size)) != local_frame);
+ memcpy((writeptr + (i * size)), local_frame, size);
+ }
+
+ if (frame_changed) {
+ block->frame = dst;
+ }
+
+ src = io_buffer + (i * size);
+
+ if (page_compressed) {
+ updated = true;
+ if (fil_page_compress(
+ src,
+ page_compress_buf,
+ 0,/* FIXME: compression level */
+ 512,/* FIXME: proper block size */
+ encrypted)) {
+ /* FIXME: remove memcpy() */
+ memcpy(src, page_compress_buf,
+ srv_page_size);
+ }
+ }
+
+ /* If tablespace is encrypted, encrypt page before we
+ write it back. Note that we should not encrypt the
+ buffer that is in buffer pool. */
+ /* NOTE: At this stage of IMPORT the
+ buffer pool is not being used at all! */
+ if (decrypted && encrypted) {
+ byte *dest = writeptr + (i * size);
+
+ byte* tmp = fil_encrypt_buf(
+ iter.crypt_data,
+ callback.get_space_id(),
+ block->page.offset,
+ mach_read_from_8(src + FIL_PAGE_LSN),
+ src,
+ callback.get_zip_size(),
+ dest);
+
+ if (tmp == src) {
+ /* TODO: remove unnecessary memcpy's */
+ ut_ad(dest != src);
+ memcpy(dest, src, size);
+ }
+
+ updated = true;
+ }
+ }
+
+ /* A page was updated in the set, write back to disk. */
+ if (updated
+ && !os_file_write(
+ iter.filepath, iter.file, writeptr,
+ offset, (ulint) n_bytes)) {
+
+ ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed");
+ err = DB_IO_ERROR;
+ goto func_exit;
+ }
+ }
+
+func_exit:
+ ut_free(page_compress_buf);
+ return err;
+}
+
+/********************************************************************//**
+Iterate over all the pages in the tablespace.
+@param table - the table definiton in the server
+@param n_io_buffers - number of blocks to read and write together
+@param callback - functor that will do the page updates
+@return DB_SUCCESS or error code */
+static
+dberr_t
+fil_tablespace_iterate(
+/*===================*/
+ dict_table_t* table,
+ ulint n_io_buffers,
+ AbstractCallback& callback)
+{
+ dberr_t err;
+ pfs_os_file_t file;
+ char* filepath;
+
+ ut_a(n_io_buffers > 0);
+ ut_ad(!srv_read_only_mode);
+
+ DBUG_EXECUTE_IF("ib_import_trigger_corruption_1",
+ return(DB_CORRUPTION););
+
+ if (DICT_TF_HAS_DATA_DIR(table->flags)) {
+ dict_get_and_save_data_dir_path(table, false);
+ ut_a(table->data_dir_path);
+
+ filepath = os_file_make_remote_pathname(
+ table->data_dir_path, table->name, "ibd");
+ } else {
+ filepath = fil_make_ibd_name(table->name, false);
+ }
+
+ {
+ ibool success;
+
+ file = os_file_create_simple_no_error_handling(
+ innodb_file_data_key, filepath,
+ OS_FILE_OPEN, OS_FILE_READ_WRITE, &success, FALSE);
+
+ DBUG_EXECUTE_IF("fil_tablespace_iterate_failure",
+ {
+ static bool once;
+
+ if (!once || ut_rnd_interval(0, 10) == 5) {
+ once = true;
+ success = FALSE;
+ os_file_close(file);
+ }
+ });
+
+ if (!success) {
+ /* The following call prints an error message */
+ os_file_get_last_error(true);
+
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "Trying to import a tablespace, but could not "
+ "open the tablespace file %s", filepath);
+
+ mem_free(filepath);
+
+ return(DB_TABLESPACE_NOT_FOUND);
+
+ } else {
+ err = DB_SUCCESS;
+ }
+ }
+
+ callback.set_file(filepath, file);
+
+ os_offset_t file_size = os_file_get_size(file);
+ ut_a(file_size != (os_offset_t) -1);
+
+ /* Allocate a page to read in the tablespace header, so that we
+ can determine the page size and zip_size (if it is compressed).
+ We allocate an extra page in case it is a compressed table. One
+ page is to ensure alignement. */
+
+ void* page_ptr = mem_alloc(3 * UNIV_PAGE_SIZE);
+ byte* page = static_cast<byte*>(ut_align(page_ptr, UNIV_PAGE_SIZE));
+
+ /* The block we will use for every physical page */
+ buf_block_t block;
+
+ memset(&block, 0, sizeof block);
+ block.frame = page;
+ block.page.space = callback.get_space_id();
+ block.page.io_fix = BUF_IO_NONE;
+ block.page.buf_fix_count = 1;
+ block.page.state = BUF_BLOCK_FILE_PAGE;
+
+ /* Read the first page and determine the page and zip size. */
+
+ if (!os_file_read_no_error_handling(file, page, 0, UNIV_PAGE_SIZE)) {
+
+ err = DB_IO_ERROR;
+
+ } else if ((err = callback.init(file_size, &block)) == DB_SUCCESS) {
+ if (const ulint zip_size = callback.get_zip_size()) {
+ page_zip_set_size(&block.page.zip, zip_size);
+ /* ROW_FORMAT=COMPRESSED is not optimised for block IO
+ for now. We do the IMPORT page by page. */
+ n_io_buffers = 1;
+ }
+
+ fil_iterator_t iter;
+
+ iter.file = file;
+ iter.start = 0;
+ iter.end = file_size;
+ iter.filepath = filepath;
+ iter.file_size = file_size;
+ iter.n_io_buffers = n_io_buffers;
+ iter.page_size = callback.get_page_size();
+
+ /* In MariaDB/MySQL 5.6 tablespace does not exist
+ during import, therefore we can't use space directly
+ here. */
+ ulint crypt_data_offset = fsp_header_get_crypt_offset(
+ callback.get_zip_size());
+
+ /* read (optional) crypt data */
+ iter.crypt_data = fil_space_read_crypt_data(
+ 0, page, crypt_data_offset);
+
+ /** If tablespace is encrypted, it needs extra buffers */
+ if (iter.crypt_data != NULL) {
+ /* decrease io buffers so that memory
+ * consumption doesnt double
+ * note: the +1 is to avoid n_io_buffers getting down to 0 */
+ iter.n_io_buffers = (iter.n_io_buffers + 1) / 2;
+ }
+
+ /** Add an extra page for compressed page scratch area. */
+
+ void* io_buffer = mem_alloc(
+ (2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
+
+ iter.io_buffer = static_cast<byte*>(
+ ut_align(io_buffer, UNIV_PAGE_SIZE));
+
+ void* crypt_io_buffer = NULL;
+ if (iter.crypt_data != NULL) {
+ crypt_io_buffer = mem_alloc(
+ (2 + iter.n_io_buffers) * UNIV_PAGE_SIZE);
+ iter.crypt_io_buffer = static_cast<byte*>(
+ ut_align(crypt_io_buffer, UNIV_PAGE_SIZE));
+ }
+
+ if (block.page.zip.ssize) {
+ ut_ad(iter.n_io_buffers == 1);
+ block.frame = iter.io_buffer;
+ block.page.zip.data = block.frame + UNIV_PAGE_SIZE;
+ ut_d(block.page.zip.m_external = true);
+ }
+
+ err = fil_iterate(iter, &block, callback);
+
+ mem_free(io_buffer);
+
+ if (crypt_io_buffer != NULL) {
+ mem_free(crypt_io_buffer);
+ iter.crypt_io_buffer = NULL;
+ fil_space_destroy_crypt_data(&iter.crypt_data);
+ }
+ }
+
+ if (err == DB_SUCCESS) {
+
+ ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk");
+
+ if (!os_file_flush(file)) {
+ ib_logf(IB_LOG_LEVEL_INFO, "os_file_flush() failed!");
+ err = DB_IO_ERROR;
+ } else {
+ ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk - done!");
+ }
+ }
+
+ os_file_close(file);
+
+ mem_free(page_ptr);
+ mem_free(filepath);
+
+ return(err);
+}
+
/*****************************************************************//**
Imports a tablespace. The space id in the .ibd file must match the space id
of the table in the data dictionary.
@@ -3496,8 +3943,6 @@ row_import_for_mysql(
row_import cfg;
- memset(&cfg, 0x0, sizeof(cfg));
-
err = row_import_read_cfg(table, trx->mysql_thd, cfg);
/* Check if the table column definitions match the contents
@@ -3580,6 +4025,23 @@ row_import_for_mysql(
DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
err = DB_TOO_MANY_CONCURRENT_TRXS;);
+ /* On DISCARD TABLESPACE, we did not drop any adaptive hash
+ index entries. If we replaced the discarded tablespace with a
+ smaller one here, there could still be some adaptive hash
+ index entries that point to cached garbage pages in the buffer
+ pool, because PageConverter::operator() only evicted those
+ pages that were replaced by the imported pages. We must
+ discard all remaining adaptive hash index entries, because the
+ adaptive hash index must be a subset of the table contents;
+ false positives are not tolerated. */
+ while (buf_LRU_drop_page_hash_for_tablespace(table)) {
+ if (trx_is_interrupted(trx)
+ || srv_shutdown_state != SRV_SHUTDOWN_NONE) {
+ err = DB_INTERRUPTED;
+ break;
+ }
+ }
+
if (err != DB_SUCCESS) {
char table_name[MAX_FULL_NAME_LEN + 1];
@@ -3772,4 +4234,3 @@ row_import_for_mysql(
return(row_import_cleanup(prebuilt, trx, err));
}
-