diff options
Diffstat (limited to 'storage/innobase/row/row0import.cc')
-rw-r--r-- | storage/innobase/row/row0import.cc | 941 |
1 files changed, 701 insertions, 240 deletions
diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc index 020a814c4eb..e1554949127 100644 --- a/storage/innobase/row/row0import.cc +++ b/storage/innobase/row/row0import.cc @@ -13,7 +13,7 @@ FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., -51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA *****************************************************************************/ @@ -31,6 +31,7 @@ Created 2012-02-08 by Sunny Bains. #endif #include "btr0pcur.h" +#include "btr0sea.h" #include "que0que.h" #include "dict0boot.h" #include "ibuf0ibuf.h" @@ -40,12 +41,17 @@ Created 2012-02-08 by Sunny Bains. #include "row0mysql.h" #include "srv0start.h" #include "row0quiesce.h" +#include "fil0pagecompress.h" +#ifdef HAVE_LZO +#include "lzo/lzo1x.h" +#endif +#ifdef HAVE_SNAPPY +#include "snappy-c.h" +#endif #include <vector> -/** The size of the buffer to use for IO. Note: os_file_read() doesn't expect -reads to fail. If you set the buffer size to be greater than a multiple of the -file size then it will assert. TODO: Fix this limitation of the IO functions. +/** The size of the buffer to use for IO. @param n - page size of the tablespace. @retval number of pages */ #define IO_BUFFER_SIZE(n) ((1024 * 1024) / n) @@ -105,18 +111,18 @@ struct row_index_t { struct row_import { row_import() UNIV_NOTHROW : - m_table(), - m_version(), - m_hostname(), - m_table_name(), - m_autoinc(), - m_page_size(), - m_flags(), - m_n_cols(), - m_cols(), - m_col_names(), - m_n_indexes(), - m_indexes(), + m_table(NULL), + m_version(0), + m_hostname(NULL), + m_table_name(NULL), + m_autoinc(0), + m_page_size(0), + m_flags(0), + m_n_cols(0), + m_cols(NULL), + m_col_names(NULL), + m_n_indexes(0), + m_indexes(NULL), m_missing(true) { } ~row_import() UNIV_NOTHROW; @@ -361,7 +367,8 @@ private: /** Functor that is called for each physical page that is read from the tablespace file. */ -class AbstractCallback : public PageCallback { +class AbstractCallback +{ public: /** Constructor @param trx - covering transaction */ @@ -394,32 +401,59 @@ public: return(get_zip_size() > 0); } -protected: /** - Get the data page depending on the table type, compressed or not. - @param block - block read from disk - @retval the buffer frame */ - buf_frame_t* get_frame(buf_block_t* block) const UNIV_NOTHROW + Set the name of the physical file and the file handle that is used + to open it for the file that is being iterated over. + @param filename - then physical name of the tablespace file. + @param file - OS file handle */ + void set_file(const char* filename, pfs_os_file_t file) UNIV_NOTHROW { - if (is_compressed_table()) { - return(block->page.zip.data); - } + m_file = file; + m_filepath = filename; + } - return(buf_block_get_frame(block)); + /** The compressed page size + @return the compressed page size */ + ulint get_zip_size() const + { + return(m_zip_size); } - /** Check for session interrupt. If required we could - even flush to disk here every N pages. - @retval DB_SUCCESS or error code */ - dberr_t periodic_check() UNIV_NOTHROW + /** The compressed page size + @return the compressed page size */ + ulint get_page_size() const { - if (trx_is_interrupted(m_trx)) { - return(DB_INTERRUPTED); - } + return(m_page_size); + } - return(DB_SUCCESS); + const char* filename() const { return m_filepath; } + + /** + Called for every page in the tablespace. If the page was not + updated then its state must be set to BUF_PAGE_NOT_USED. For + compressed tables the page descriptor memory will be at offset: + block->frame + UNIV_PAGE_SIZE; + @param block block read from file, note it is not from the buffer pool + @retval DB_SUCCESS or error code. */ + virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0; + + /** + @return the space id of the tablespace */ + virtual ulint get_space_id() const UNIV_NOTHROW = 0; + + bool is_interrupted() const { return trx_is_interrupted(m_trx); } + + /** + Get the data page depending on the table type, compressed or not. + @param block - block read from disk + @retval the buffer frame */ + static byte* get_frame(const buf_block_t* block) + { + return block->page.zip.data + ? block->page.zip.data : block->frame; } +protected: /** Get the physical offset of the extent descriptor within the page. @param page_no - page number of the extent descriptor @@ -509,6 +543,18 @@ protected: } protected: + /** Compressed table page size */ + ulint m_zip_size; + + /** The tablespace page size. */ + ulint m_page_size; + + /** File handle to the tablespace */ + pfs_os_file_t m_file; + + /** Physical file path. */ + const char* m_filepath; + /** Covering transaction. */ trx_t* m_trx; @@ -565,9 +611,9 @@ AbstractCallback::init( /* Since we don't know whether it is a compressed table or not, the data is always read into the block->frame. */ - dberr_t err = set_zip_size(block->frame); + m_zip_size = fsp_header_get_zip_size(page); - if (err != DB_SUCCESS) { + if (!ut_is_2pow(m_zip_size) || m_zip_size > UNIV_ZIP_SIZE_MAX) { return(DB_CORRUPTION); } @@ -604,11 +650,7 @@ AbstractCallback::init( m_free_limit = mach_read_from_4(page + FSP_FREE_LIMIT); m_space = mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_ID); - if ((err = set_current_xdes(0, page)) != DB_SUCCESS) { - return(err); - } - - return(DB_SUCCESS); + return set_current_xdes(0, page); } /** @@ -650,12 +692,9 @@ struct FetchIndexRootPages : public AbstractCallback { /** Called for each block as it is read from the file. - @param offset - physical offset in the file - @param block - block to convert, it is not from the buffer pool. + @param block block to convert, it is not from the buffer pool. @retval DB_SUCCESS or error code. */ - virtual dberr_t operator() ( - os_offset_t offset, - buf_block_t* block) UNIV_NOTHROW; + dberr_t operator()(buf_block_t* block) UNIV_NOTHROW; /** Update the import configuration that will be used to import the tablespace. */ @@ -673,34 +712,18 @@ Called for each block as it is read from the file. Check index pages to determine the exact row format. We can't get that from the tablespace header flags alone. -@param offset - physical offset in the file -@param block - block to convert, it is not from the buffer pool. +@param block block to convert, it is not from the buffer pool. @retval DB_SUCCESS or error code. */ -dberr_t -FetchIndexRootPages::operator() ( - os_offset_t offset, - buf_block_t* block) UNIV_NOTHROW +dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW { - dberr_t err; - - if ((err = periodic_check()) != DB_SUCCESS) { - return(err); - } + if (is_interrupted()) return DB_INTERRUPTED; const page_t* page = get_frame(block); ulint page_type = fil_page_get_type(page); - if (block->page.offset * m_page_size != offset) { - ib_logf(IB_LOG_LEVEL_ERROR, - "Page offset doesn't match file offset: " - "page offset: %u, file offset: " ULINTPF, - block->page.offset, - (ulint) (offset / m_page_size)); - - err = DB_CORRUPTION; - } else if (page_type == FIL_PAGE_TYPE_XDES) { - err = set_current_xdes(block->page.offset, page); + if (page_type == FIL_PAGE_TYPE_XDES) { + return set_current_xdes(block->page.offset, page); } else if (page_type == FIL_PAGE_INDEX && !is_free(block->page.offset) && is_root_page(page)) { @@ -725,7 +748,7 @@ FetchIndexRootPages::operator() ( } } - return(err); + return DB_SUCCESS; } /** @@ -842,21 +865,10 @@ public: /** Called for each block as it is read from the file. - @param offset - physical offset in the file - @param block - block to convert, it is not from the buffer pool. + @param block block to convert, it is not from the buffer pool. @retval DB_SUCCESS or error code. */ - virtual dberr_t operator() ( - os_offset_t offset, - buf_block_t* block) UNIV_NOTHROW; + dberr_t operator()(buf_block_t* block) UNIV_NOTHROW; private: - - /** Status returned by PageConverter::validate() */ - enum import_page_status_t { - IMPORT_PAGE_STATUS_OK, /*!< Page is OK */ - IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */ - IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */ - }; - /** Update the page, set the space id, max trx id and index id. @param block - block read from file @@ -866,17 +878,6 @@ private: buf_block_t* block, ulint& page_type) UNIV_NOTHROW; -#if defined UNIV_DEBUG - /** - @return true error condition is enabled. */ - bool trigger_corruption() UNIV_NOTHROW - { - return(false); - } - #else -#define trigger_corruption() (false) -#endif /* UNIV_DEBUG */ - /** Update the space, index id, trx id. @param block - block to convert @@ -890,15 +891,6 @@ private: dberr_t update_records(buf_block_t* block) UNIV_NOTHROW; /** - Validate the page, check for corruption. - @param offset - physical offset within file. - @param page - page read from file. - @return 0 on success, 1 if all zero, 2 if corrupted */ - import_page_status_t validate( - os_offset_t offset, - buf_block_t* page) UNIV_NOTHROW; - - /** Validate the space flags and update tablespace header page. @param block - block read from file, not from the buffer pool. @retval DB_SUCCESS or error code */ @@ -1306,17 +1298,63 @@ row_import::match_schema( { /* Do some simple checks. */ - if ((m_table->flags ^ m_flags) & ~DICT_TF_MASK_DATA_DIR) { + if (ulint mismatch = (m_table->flags ^ m_flags) + & ~DICT_TF_MASK_DATA_DIR) { + const char* msg; + if (mismatch & DICT_TF_MASK_ZIP_SSIZE) { + if ((m_table->flags & DICT_TF_MASK_ZIP_SSIZE) + && (m_flags & DICT_TF_MASK_ZIP_SSIZE)) { + switch (m_flags & DICT_TF_MASK_ZIP_SSIZE) { + case 0U << DICT_TF_POS_ZIP_SSIZE: + goto uncompressed; + case 1U << DICT_TF_POS_ZIP_SSIZE: + msg = "ROW_FORMAT=COMPRESSED" + " KEY_BLOCK_SIZE=1"; + break; + case 2U << DICT_TF_POS_ZIP_SSIZE: + msg = "ROW_FORMAT=COMPRESSED" + " KEY_BLOCK_SIZE=2"; + break; + case 3U << DICT_TF_POS_ZIP_SSIZE: + msg = "ROW_FORMAT=COMPRESSED" + " KEY_BLOCK_SIZE=4"; + break; + case 4U << DICT_TF_POS_ZIP_SSIZE: + msg = "ROW_FORMAT=COMPRESSED" + " KEY_BLOCK_SIZE=8"; + break; + case 5U << DICT_TF_POS_ZIP_SSIZE: + msg = "ROW_FORMAT=COMPRESSED" + " KEY_BLOCK_SIZE=16"; + break; + default: + msg = "strange KEY_BLOCK_SIZE"; + } + } else if (m_flags & DICT_TF_MASK_ZIP_SSIZE) { + msg = "ROW_FORMAT=COMPRESSED"; + } else { + goto uncompressed; + } + } else { +uncompressed: + msg = (m_flags & DICT_TF_MASK_ATOMIC_BLOBS) + ? "ROW_FORMAT=DYNAMIC" + : (m_flags & DICT_TF_MASK_COMPACT) + ? "ROW_FORMAT=COMPACT" + : "ROW_FORMAT=REDUNDANT"; + } + ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH, "Table flags don't match, server table has 0x%x" - " and the meta-data file has 0x%lx", - m_table->flags, ulong(m_flags)); + " and the meta-data file has 0x%lx;" + " .cfg file uses %s", + m_table->flags, ulong(m_flags), msg); return(DB_ERROR); } else if (m_table->n_cols != m_n_cols) { ib_errf(thd, IB_LOG_LEVEL_ERROR, ER_TABLE_SCHEMA_MISMATCH, - "Number of columns don't match, table has %u" - " columns but the tablespace meta-data file has " + "Number of columns don't match, table has %u " + "columns but the tablespace meta-data file has " ULINTPF " columns", m_table->n_cols, m_n_cols); @@ -1596,6 +1634,7 @@ IndexPurge::purge() UNIV_NOTHROW Constructor * @param cfg - config of table being imported. * @param trx - transaction covering the import */ +inline PageConverter::PageConverter( row_import* cfg, trx_t* trx) @@ -1620,6 +1659,7 @@ Adjust the BLOB reference for a single column that is externally stored @param offsets - column offsets for the record @param i - column ordinal value @return DB_SUCCESS or error code */ +inline dberr_t PageConverter::adjust_cluster_index_blob_column( rec_t* rec, @@ -1672,6 +1712,7 @@ stored columns. @param rec - record to update @param offsets - column offsets for the record @return DB_SUCCESS or error code */ +inline dberr_t PageConverter::adjust_cluster_index_blob_columns( rec_t* rec, @@ -1705,6 +1746,7 @@ BLOB reference, write the new space id. @param rec - record to update @param offsets - column offsets for the record @return DB_SUCCESS or error code */ +inline dberr_t PageConverter::adjust_cluster_index_blob_ref( rec_t* rec, @@ -1728,6 +1770,7 @@ Purge delete-marked records, only if it is possible to do so without re-organising the B+tree. @param offsets - current row offsets. @return true if purge succeeded */ +inline bool PageConverter::purge(const ulint* offsets) UNIV_NOTHROW { @@ -1752,6 +1795,7 @@ Adjust the BLOB references and sys fields for the current record. @param offsets - column offsets for the record @param deleted - true if row is delete marked @return DB_SUCCESS or error code. */ +inline dberr_t PageConverter::adjust_cluster_record( const dict_index_t* index, @@ -1780,6 +1824,7 @@ Update the BLOB refrences and write UNDO log entries for rows that can't be purged optimistically. @param block - block to update @retval DB_SUCCESS or error code */ +inline dberr_t PageConverter::update_records( buf_block_t* block) UNIV_NOTHROW @@ -1791,10 +1836,6 @@ PageConverter::update_records( m_rec_iter.open(block); - if (!page_is_leaf(block->frame)) { - return DB_SUCCESS; - } - while (!m_rec_iter.end()) { rec_t* rec = m_rec_iter.current(); ibool deleted = rec_get_deleted_flag(rec, comp); @@ -1845,6 +1886,7 @@ PageConverter::update_records( /** Update the space, index id, trx id. @return DB_SUCCESS or error code */ +inline dberr_t PageConverter::update_index_page( buf_block_t* block) UNIV_NOTHROW @@ -1907,13 +1949,14 @@ PageConverter::update_index_page( return(DB_SUCCESS); } - return(update_records(block)); + return page_is_leaf(block->frame) ? update_records(block) : DB_SUCCESS; } /** Validate the space flags and update tablespace header page. @param block - block read from file, not from the buffer pool. @retval DB_SUCCESS or error code */ +inline dberr_t PageConverter::update_header( buf_block_t* block) UNIV_NOTHROW @@ -1953,6 +1996,7 @@ PageConverter::update_header( Update the page, set the space id, max trx id and index id. @param block - block read from file @retval DB_SUCCESS or error code */ +inline dberr_t PageConverter::update_page( buf_block_t* block, @@ -1960,6 +2004,14 @@ PageConverter::update_page( { dberr_t err = DB_SUCCESS; + ut_ad(!block->page.zip.data == !is_compressed_table()); + + if (block->page.zip.data) { + m_page_zip_ptr = &block->page.zip; + } else { + ut_ad(!m_page_zip_ptr); + } + switch (page_type = fil_page_get_type(get_frame(block))) { case FIL_PAGE_TYPE_FSP_HDR: /* Work directly on the uncompressed page headers. */ @@ -2015,140 +2067,44 @@ PageConverter::update_page( } /** -Validate the page -@param offset - physical offset within file. -@param page - page read from file. -@return status */ -PageConverter::import_page_status_t -PageConverter::validate( - os_offset_t offset, - buf_block_t* block) UNIV_NOTHROW -{ - buf_frame_t* page = get_frame(block); - - /* Check that the page number corresponds to the offset in - the file. Flag as corrupt if it doesn't. Disable the check - for LSN in buf_page_is_corrupted() */ - - if (buf_page_is_corrupted(false, page, get_zip_size(), NULL) - || (page_get_page_no(page) != offset / m_page_size - && page_get_page_no(page) != 0)) { - - return(IMPORT_PAGE_STATUS_CORRUPTED); - - } else if (offset > 0 && page_get_page_no(page) == 0) { - ulint checksum; - - checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM); - if (checksum != 0) { - /* Checksum check passed in buf_page_is_corrupted(). */ - ib_logf(IB_LOG_LEVEL_WARN, - "%s: Page %lu checksum " ULINTPF - " should be zero.", - m_filepath, (ulong) (offset / m_page_size), - checksum); - } - - const byte* b = page + FIL_PAGE_OFFSET; - const byte* e = page + m_page_size - - FIL_PAGE_END_LSN_OLD_CHKSUM; - - /* If the page number is zero and offset > 0 then - the entire page MUST consist of zeroes. If not then - we flag it as corrupt. */ - - while (b != e) { - - if (*b++ && !trigger_corruption()) { - return(IMPORT_PAGE_STATUS_CORRUPTED); - } - } - - /* The page is all zero: do nothing. */ - return(IMPORT_PAGE_STATUS_ALL_ZERO); - } - - return(IMPORT_PAGE_STATUS_OK); -} - -/** Called for every page in the tablespace. If the page was not updated then its state must be set to BUF_PAGE_NOT_USED. -@param offset - physical offset within the file -@param block - block read from file, note it is not from the buffer pool +@param block block read from file, note it is not from the buffer pool @retval DB_SUCCESS or error code. */ -dberr_t -PageConverter::operator() ( - os_offset_t offset, - buf_block_t* block) UNIV_NOTHROW +dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW { - ulint page_type; - dberr_t err = DB_SUCCESS; - - if ((err = periodic_check()) != DB_SUCCESS) { - return(err); - } - - if (is_compressed_table()) { - m_page_zip_ptr = &block->page.zip; - } else { - ut_ad(m_page_zip_ptr == 0); - } - - switch(validate(offset, block)) { - case IMPORT_PAGE_STATUS_OK: - - /* We have to decompress the compressed pages before - we can work on them */ - - if ((err = update_page(block, page_type)) != DB_SUCCESS) { - break; - } - - /* Note: For compressed pages this function will write to the - zip descriptor and for uncompressed pages it will write to - page (ie. the block->frame). Therefore the caller should write - out the descriptor contents and not block->frame for compressed - pages. */ - - if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) { - - buf_flush_init_for_writing( - !is_compressed_table() - ? block->frame : block->page.zip.data, - !is_compressed_table() ? 0 : m_page_zip_ptr, - m_current_lsn); - } else { - /* Calculate and update the checksum of non-btree - pages for compressed tables explicitly here. */ - - buf_flush_update_zip_checksum( - get_frame(block), get_zip_size(), - m_current_lsn); - } - - break; + /* If we already had an old page with matching number + in the buffer pool, evict it now, because + we no longer evict the pages on DISCARD TABLESPACE. */ + buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset, + RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL, + __FILE__, __LINE__, NULL); - case IMPORT_PAGE_STATUS_ALL_ZERO: - /* The page is all zero: leave it as is. */ - break; + ulint page_type; - case IMPORT_PAGE_STATUS_CORRUPTED: + dberr_t err = update_page(block, page_type); + if (err != DB_SUCCESS) return err; - ib_logf(IB_LOG_LEVEL_WARN, - "%s: Page %lu at offset " UINT64PF " looks corrupted.", - m_filepath, (ulong) (offset / m_page_size), offset); + /* Note: For compressed pages this function will write to the + zip descriptor and for uncompressed pages it will write to + page (ie. the block->frame). Therefore the caller should write + out the descriptor contents and not block->frame for compressed + pages. */ - err = DB_CORRUPTION; + if (!is_compressed_table() || page_type == FIL_PAGE_INDEX) { + buf_flush_init_for_writing( + get_frame(block), + block->page.zip.data ? &block->page.zip : NULL, + m_current_lsn); + } else { + /* Calculate and update the checksum of non-btree + pages for compressed tables explicitly here. */ + buf_flush_update_zip_checksum( + get_frame(block), get_zip_size(), + m_current_lsn); } - /* If we already had and old page with matching number - in the buffer pool, evict it now, because - we no longer evict the pages on DISCARD TABLESPACE. */ - buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset, - RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL, - __FILE__, __LINE__, NULL); - return(err); + return DB_SUCCESS; } /*****************************************************************//** @@ -2558,8 +2514,6 @@ row_import_cfg_read_index_fields( dict_field_t* field = index->m_fields; - memset(field, 0x0, sizeof(*field) * n_fields); - for (ulint i = 0; i < n_fields; ++i, ++field) { byte* ptr = row; @@ -2577,6 +2531,8 @@ row_import_cfg_read_index_fields( return(DB_IO_ERROR); } + new (field) dict_field_t(); + field->prefix_len = mach_read_from_4(ptr); ptr += sizeof(ib_uint32_t); @@ -3423,6 +3379,497 @@ row_import_update_discarded_flag( return(err); } +struct fil_iterator_t { + pfs_os_file_t file; /*!< File handle */ + const char* filepath; /*!< File path name */ + os_offset_t start; /*!< From where to start */ + os_offset_t end; /*!< Where to stop */ + os_offset_t file_size; /*!< File size in bytes */ + ulint page_size; /*!< Page size */ + ulint n_io_buffers; /*!< Number of pages to use + for IO */ + byte* io_buffer; /*!< Buffer to use for IO */ + fil_space_crypt_t *crypt_data; /*!< Crypt data (if encrypted) */ + byte* crypt_io_buffer; /*!< IO buffer when encrypted */ +}; + +/********************************************************************//** +TODO: This can be made parallel trivially by chunking up the file and creating +a callback per thread. . Main benefit will be to use multiple CPUs for +checksums and compressed tables. We have to do compressed tables block by +block right now. Secondly we need to decompress/compress and copy too much +of data. These are CPU intensive. + +Iterate over all the pages in the tablespace. +@param iter - Tablespace iterator +@param block - block to use for IO +@param callback - Callback to inspect and update page contents +@retval DB_SUCCESS or error code */ +static +dberr_t +fil_iterate( +/*========*/ + const fil_iterator_t& iter, + buf_block_t* block, + AbstractCallback& callback) +{ + os_offset_t offset; + ulint n_bytes = iter.n_io_buffers * iter.page_size; + + const ulint buf_size = srv_page_size +#ifdef HAVE_LZO + + LZO1X_1_15_MEM_COMPRESS +#elif defined HAVE_SNAPPY + + snappy_max_compressed_length(srv_page_size) +#endif + ; + byte* page_compress_buf = static_cast<byte*>( + ut_malloc_low(buf_size, false)); + ut_ad(!srv_read_only_mode); + + if (!page_compress_buf) { + return DB_OUT_OF_MEMORY; + } + + /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless + copying for non-index pages. Unfortunately, it is + required by buf_zip_decompress() */ + dberr_t err = DB_SUCCESS; + + for (offset = iter.start; offset < iter.end; offset += n_bytes) { + if (callback.is_interrupted()) { + err = DB_INTERRUPTED; + goto func_exit; + } + + byte* io_buffer = iter.io_buffer; + block->frame = io_buffer; + + if (block->page.zip.data) { + /* Zip IO is done in the compressed page buffer. */ + io_buffer = block->page.zip.data; + ut_ad(PAGE_ZIP_MATCH(block->frame, &block->page.zip)); + } + + /* We have to read the exact number of bytes. Otherwise the + InnoDB IO functions croak on failed reads. */ + + n_bytes = ulint(ut_min(os_offset_t(n_bytes), + iter.end - offset)); + + ut_ad(n_bytes > 0); + ut_ad(!(n_bytes % iter.page_size)); + + const bool encrypted = iter.crypt_data != NULL + && iter.crypt_data->should_encrypt(); + /* Use additional crypt io buffer if tablespace is encrypted */ + byte* const readptr = encrypted + ? iter.crypt_io_buffer : io_buffer; + byte* const writeptr = readptr; + + if (!os_file_read_no_error_handling(iter.file, readptr, + offset, n_bytes)) { + ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed"); + err = DB_IO_ERROR; + goto func_exit; + } + + bool updated = false; + const ulint size = iter.page_size; + ulint n_pages_read = ulint(n_bytes) / size; + block->page.offset = offset / size; + + for (ulint i = 0; i < n_pages_read; + ++i, block->frame += size, block->page.offset++) { + byte* src = readptr + (i * size); + const ulint page_no = page_get_page_no(src); + if (!page_no && block->page.offset) { + const ulint* b = reinterpret_cast<const ulint*> + (src); + const ulint* const e = b + size / sizeof *b; + do { + if (*b++) { + goto page_corrupted; + } + } while (b != e); + + /* Proceed to the next page, + because this one is all zero. */ + continue; + } + + if (page_no != block->page.offset) { +page_corrupted: + ib_logf(IB_LOG_LEVEL_WARN, + "%s: Page %lu at offset " + UINT64PF " looks corrupted.", + callback.filename(), + ulong(offset / size), offset); + err = DB_CORRUPTION; + goto func_exit; + } + + bool decrypted = false; + byte* dst = io_buffer + (i * size); + bool frame_changed = false; + ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE); + const bool page_compressed + = page_type + == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED + || page_type == FIL_PAGE_PAGE_COMPRESSED; + + if (page_compressed && block->page.zip.data) { + goto page_corrupted; + } + + if (!encrypted) { + } else if (!mach_read_from_4( + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + + src)) { +not_encrypted: + if (!page_compressed + && !block->page.zip.data) { + block->frame = src; + frame_changed = true; + } else { + ut_ad(dst != src); + memcpy(dst, src, size); + } + } else { + if (!fil_space_verify_crypt_checksum( + src, callback.get_zip_size())) { + goto page_corrupted; + } + + decrypted = fil_space_decrypt( + iter.crypt_data, dst, + iter.page_size, src, &err); + + if (err != DB_SUCCESS) { + goto func_exit; + } + + if (!decrypted) { + goto not_encrypted; + } + + updated = true; + } + + /* If the original page is page_compressed, we need + to decompress it before adjusting further. */ + if (page_compressed) { + ulint compress_length = fil_page_decompress( + page_compress_buf, dst); + ut_ad(compress_length != srv_page_size); + if (compress_length == 0) { + goto page_corrupted; + } + updated = true; + } else if (buf_page_is_corrupted( + false, + encrypted && !frame_changed + ? dst : src, + callback.get_zip_size(), NULL)) { + goto page_corrupted; + } + + if ((err = callback(block)) != DB_SUCCESS) { + goto func_exit; + } else if (!updated) { + updated = buf_block_get_state(block) + == BUF_BLOCK_FILE_PAGE; + } + + /* If tablespace is encrypted we use additional + temporary scratch area where pages are read + for decrypting readptr == crypt_io_buffer != io_buffer. + + Destination for decryption is a buffer pool block + block->frame == dst == io_buffer that is updated. + Pages that did not require decryption even when + tablespace is marked as encrypted are not copied + instead block->frame is set to src == readptr. + + For encryption we again use temporary scratch area + writeptr != io_buffer == dst + that is then written to the tablespace + + (1) For normal tables io_buffer == dst == writeptr + (2) For only page compressed tables + io_buffer == dst == writeptr + (3) For encrypted (and page compressed) + readptr != io_buffer == dst != writeptr + */ + + ut_ad(!encrypted && !page_compressed ? + src == dst && dst == writeptr + (i * size):1); + ut_ad(page_compressed && !encrypted ? + src == dst && dst == writeptr + (i * size):1); + ut_ad(encrypted ? + src != dst && dst != writeptr + (i * size):1); + + /* When tablespace is encrypted or compressed its + first page (i.e. page 0) is not encrypted or + compressed and there is no need to copy frame. */ + if (encrypted && block->page.offset != 0) { + byte *local_frame = callback.get_frame(block); + ut_ad((writeptr + (i * size)) != local_frame); + memcpy((writeptr + (i * size)), local_frame, size); + } + + if (frame_changed) { + block->frame = dst; + } + + src = io_buffer + (i * size); + + if (page_compressed) { + updated = true; + if (fil_page_compress( + src, + page_compress_buf, + 0,/* FIXME: compression level */ + 512,/* FIXME: proper block size */ + encrypted)) { + /* FIXME: remove memcpy() */ + memcpy(src, page_compress_buf, + srv_page_size); + } + } + + /* If tablespace is encrypted, encrypt page before we + write it back. Note that we should not encrypt the + buffer that is in buffer pool. */ + /* NOTE: At this stage of IMPORT the + buffer pool is not being used at all! */ + if (decrypted && encrypted) { + byte *dest = writeptr + (i * size); + + byte* tmp = fil_encrypt_buf( + iter.crypt_data, + callback.get_space_id(), + block->page.offset, + mach_read_from_8(src + FIL_PAGE_LSN), + src, + callback.get_zip_size(), + dest); + + if (tmp == src) { + /* TODO: remove unnecessary memcpy's */ + ut_ad(dest != src); + memcpy(dest, src, size); + } + + updated = true; + } + } + + /* A page was updated in the set, write back to disk. */ + if (updated + && !os_file_write( + iter.filepath, iter.file, writeptr, + offset, (ulint) n_bytes)) { + + ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed"); + err = DB_IO_ERROR; + goto func_exit; + } + } + +func_exit: + ut_free(page_compress_buf); + return err; +} + +/********************************************************************//** +Iterate over all the pages in the tablespace. +@param table - the table definiton in the server +@param n_io_buffers - number of blocks to read and write together +@param callback - functor that will do the page updates +@return DB_SUCCESS or error code */ +static +dberr_t +fil_tablespace_iterate( +/*===================*/ + dict_table_t* table, + ulint n_io_buffers, + AbstractCallback& callback) +{ + dberr_t err; + pfs_os_file_t file; + char* filepath; + + ut_a(n_io_buffers > 0); + ut_ad(!srv_read_only_mode); + + DBUG_EXECUTE_IF("ib_import_trigger_corruption_1", + return(DB_CORRUPTION);); + + if (DICT_TF_HAS_DATA_DIR(table->flags)) { + dict_get_and_save_data_dir_path(table, false); + ut_a(table->data_dir_path); + + filepath = os_file_make_remote_pathname( + table->data_dir_path, table->name, "ibd"); + } else { + filepath = fil_make_ibd_name(table->name, false); + } + + { + ibool success; + + file = os_file_create_simple_no_error_handling( + innodb_file_data_key, filepath, + OS_FILE_OPEN, OS_FILE_READ_WRITE, &success, FALSE); + + DBUG_EXECUTE_IF("fil_tablespace_iterate_failure", + { + static bool once; + + if (!once || ut_rnd_interval(0, 10) == 5) { + once = true; + success = FALSE; + os_file_close(file); + } + }); + + if (!success) { + /* The following call prints an error message */ + os_file_get_last_error(true); + + ib_logf(IB_LOG_LEVEL_ERROR, + "Trying to import a tablespace, but could not " + "open the tablespace file %s", filepath); + + mem_free(filepath); + + return(DB_TABLESPACE_NOT_FOUND); + + } else { + err = DB_SUCCESS; + } + } + + callback.set_file(filepath, file); + + os_offset_t file_size = os_file_get_size(file); + ut_a(file_size != (os_offset_t) -1); + + /* Allocate a page to read in the tablespace header, so that we + can determine the page size and zip_size (if it is compressed). + We allocate an extra page in case it is a compressed table. One + page is to ensure alignement. */ + + void* page_ptr = mem_alloc(3 * UNIV_PAGE_SIZE); + byte* page = static_cast<byte*>(ut_align(page_ptr, UNIV_PAGE_SIZE)); + + /* The block we will use for every physical page */ + buf_block_t block; + + memset(&block, 0, sizeof block); + block.frame = page; + block.page.space = callback.get_space_id(); + block.page.io_fix = BUF_IO_NONE; + block.page.buf_fix_count = 1; + block.page.state = BUF_BLOCK_FILE_PAGE; + + /* Read the first page and determine the page and zip size. */ + + if (!os_file_read_no_error_handling(file, page, 0, UNIV_PAGE_SIZE)) { + + err = DB_IO_ERROR; + + } else if ((err = callback.init(file_size, &block)) == DB_SUCCESS) { + if (const ulint zip_size = callback.get_zip_size()) { + page_zip_set_size(&block.page.zip, zip_size); + /* ROW_FORMAT=COMPRESSED is not optimised for block IO + for now. We do the IMPORT page by page. */ + n_io_buffers = 1; + } + + fil_iterator_t iter; + + iter.file = file; + iter.start = 0; + iter.end = file_size; + iter.filepath = filepath; + iter.file_size = file_size; + iter.n_io_buffers = n_io_buffers; + iter.page_size = callback.get_page_size(); + + /* In MariaDB/MySQL 5.6 tablespace does not exist + during import, therefore we can't use space directly + here. */ + ulint crypt_data_offset = fsp_header_get_crypt_offset( + callback.get_zip_size()); + + /* read (optional) crypt data */ + iter.crypt_data = fil_space_read_crypt_data( + 0, page, crypt_data_offset); + + /** If tablespace is encrypted, it needs extra buffers */ + if (iter.crypt_data != NULL) { + /* decrease io buffers so that memory + * consumption doesnt double + * note: the +1 is to avoid n_io_buffers getting down to 0 */ + iter.n_io_buffers = (iter.n_io_buffers + 1) / 2; + } + + /** Add an extra page for compressed page scratch area. */ + + void* io_buffer = mem_alloc( + (2 + iter.n_io_buffers) * UNIV_PAGE_SIZE); + + iter.io_buffer = static_cast<byte*>( + ut_align(io_buffer, UNIV_PAGE_SIZE)); + + void* crypt_io_buffer = NULL; + if (iter.crypt_data != NULL) { + crypt_io_buffer = mem_alloc( + (2 + iter.n_io_buffers) * UNIV_PAGE_SIZE); + iter.crypt_io_buffer = static_cast<byte*>( + ut_align(crypt_io_buffer, UNIV_PAGE_SIZE)); + } + + if (block.page.zip.ssize) { + ut_ad(iter.n_io_buffers == 1); + block.frame = iter.io_buffer; + block.page.zip.data = block.frame + UNIV_PAGE_SIZE; + ut_d(block.page.zip.m_external = true); + } + + err = fil_iterate(iter, &block, callback); + + mem_free(io_buffer); + + if (crypt_io_buffer != NULL) { + mem_free(crypt_io_buffer); + iter.crypt_io_buffer = NULL; + fil_space_destroy_crypt_data(&iter.crypt_data); + } + } + + if (err == DB_SUCCESS) { + + ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk"); + + if (!os_file_flush(file)) { + ib_logf(IB_LOG_LEVEL_INFO, "os_file_flush() failed!"); + err = DB_IO_ERROR; + } else { + ib_logf(IB_LOG_LEVEL_INFO, "Sync to disk - done!"); + } + } + + os_file_close(file); + + mem_free(page_ptr); + mem_free(filepath); + + return(err); +} + /*****************************************************************//** Imports a tablespace. The space id in the .ibd file must match the space id of the table in the data dictionary. @@ -3496,8 +3943,6 @@ row_import_for_mysql( row_import cfg; - memset(&cfg, 0x0, sizeof(cfg)); - err = row_import_read_cfg(table, trx->mysql_thd, cfg); /* Check if the table column definitions match the contents @@ -3580,6 +4025,23 @@ row_import_for_mysql( DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure", err = DB_TOO_MANY_CONCURRENT_TRXS;); + /* On DISCARD TABLESPACE, we did not drop any adaptive hash + index entries. If we replaced the discarded tablespace with a + smaller one here, there could still be some adaptive hash + index entries that point to cached garbage pages in the buffer + pool, because PageConverter::operator() only evicted those + pages that were replaced by the imported pages. We must + discard all remaining adaptive hash index entries, because the + adaptive hash index must be a subset of the table contents; + false positives are not tolerated. */ + while (buf_LRU_drop_page_hash_for_tablespace(table)) { + if (trx_is_interrupted(trx) + || srv_shutdown_state != SRV_SHUTDOWN_NONE) { + err = DB_INTERRUPTED; + break; + } + } + if (err != DB_SUCCESS) { char table_name[MAX_FULL_NAME_LEN + 1]; @@ -3772,4 +4234,3 @@ row_import_for_mysql( return(row_import_cleanup(prebuilt, trx, err)); } - |