summaryrefslogtreecommitdiff
path: root/storage/xtradb/fil/fil0crypt.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/xtradb/fil/fil0crypt.cc')
-rw-r--r--storage/xtradb/fil/fil0crypt.cc1323
1 files changed, 651 insertions, 672 deletions
diff --git a/storage/xtradb/fil/fil0crypt.cc b/storage/xtradb/fil/fil0crypt.cc
index 131d03ea17a..60ab067d105 100644
--- a/storage/xtradb/fil/fil0crypt.cc
+++ b/storage/xtradb/fil/fil0crypt.cc
@@ -37,7 +37,6 @@ Modified Jan Lindström jan.lindstrom@mariadb.com
#include "fsp0fsp.h"
#include "fil0pagecompress.h"
#include "ha_prototypes.h" // IB_LOG_
-
#include <my_crypt.h>
/** Mutex for keys */
@@ -59,7 +58,7 @@ UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
/** At this age or older a space/page will be rotated */
-UNIV_INTERN uint srv_fil_crypt_rotate_key_age = 1;
+UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
/** Event to signal FROM the key rotation threads. */
static os_event_t fil_crypt_event;
@@ -67,11 +66,11 @@ static os_event_t fil_crypt_event;
/** Event to signal TO the key rotation threads. */
UNIV_INTERN os_event_t fil_crypt_threads_event;
-/** Event for waking up threads throttle */
+/** Event for waking up threads throttle. */
static os_event_t fil_crypt_throttle_sleep_event;
-/** Mutex for key rotation threads */
-static ib_mutex_t fil_crypt_threads_mutex;
+/** Mutex for key rotation threads. */
+UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
#ifdef UNIV_PFS_MUTEX
static mysql_pfs_key_t fil_crypt_threads_mutex_key;
@@ -104,9 +103,12 @@ static mysql_pfs_key_t fil_crypt_stat_mutex_key;
UNIV_INTERN mysql_pfs_key_t fil_crypt_data_mutex_key;
#endif
+/** Is background scrubbing enabled, defined on btr0scrub.cc */
+extern my_bool srv_background_scrub_data_uncompressed;
+extern my_bool srv_background_scrub_data_compressed;
+
static bool
fil_crypt_needs_rotation(
-/*=====================*/
fil_encryption_t encrypt_mode, /*!< in: Encryption
mode */
uint key_version, /*!< in: Key version */
@@ -118,7 +120,6 @@ Init space crypt */
UNIV_INTERN
void
fil_space_crypt_init()
-/*==================*/
{
mutex_create(fil_crypt_key_mutex_key,
&fil_crypt_key_mutex, SYNC_NO_ORDER_CHECK);
@@ -127,6 +128,7 @@ fil_space_crypt_init()
mutex_create(fil_crypt_stat_mutex_key,
&crypt_stat_mutex, SYNC_NO_ORDER_CHECK);
+
memset(&crypt_stat, 0, sizeof(crypt_stat));
}
@@ -135,9 +137,9 @@ Cleanup space crypt */
UNIV_INTERN
void
fil_space_crypt_cleanup()
-/*=====================*/
{
os_event_free(fil_crypt_throttle_sleep_event);
+ fil_crypt_throttle_sleep_event = NULL;
mutex_free(&fil_crypt_key_mutex);
mutex_free(&crypt_stat_mutex);
}
@@ -146,7 +148,7 @@ fil_space_crypt_cleanup()
Get latest key version from encryption plugin.
@return key version or ENCRYPTION_KEY_VERSION_INVALID */
uint
-fil_space_crypt_struct::key_get_latest_version(void)
+fil_space_crypt_t::key_get_latest_version(void)
{
uint key_version = key_found;
@@ -160,12 +162,12 @@ fil_space_crypt_struct::key_get_latest_version(void)
}
/******************************************************************
-Get the latest(key-version), waking the encrypt thread, if needed */
+Get the latest(key-version), waking the encrypt thread, if needed
+@param[in,out] crypt_data Crypt data */
static inline
uint
fil_crypt_get_latest_key_version(
-/*=============================*/
- fil_space_crypt_t* crypt_data) /*!< in: crypt data */
+ fil_space_crypt_t* crypt_data)
{
ut_ad(crypt_data != NULL);
@@ -204,28 +206,31 @@ crypt_data_scheme_locker(
/******************************************************************
Create a fil_space_crypt_t object
+@param[in] type CRYPT_SCHEME_UNENCRYPTE or
+ CRYPT_SCHEME_1
+@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
+ FIL_ENCRYPTION_ON or
+ FIL_ENCRYPTION_OFF
+@param[in] min_key_version key_version or 0
+@param[in] key_id Used key id
@return crypt object */
static
fil_space_crypt_t*
fil_space_create_crypt_data(
-/*========================*/
uint type,
fil_encryption_t encrypt_mode,
uint min_key_version,
- uint key_id,
- ulint offset)
+ uint key_id)
{
- const uint sz = sizeof(fil_space_crypt_t);
- void* buf = mem_zalloc(sz);
+ void* buf = mem_zalloc(sizeof(fil_space_crypt_t));
fil_space_crypt_t* crypt_data = NULL;
if (buf) {
crypt_data = new(buf)
- fil_space_crypt_struct(
+ fil_space_crypt_t(
type,
min_key_version,
key_id,
- offset,
encrypt_mode);
}
@@ -234,25 +239,30 @@ fil_space_create_crypt_data(
/******************************************************************
Create a fil_space_crypt_t object
+@param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
+ FIL_ENCRYPTION_ON or
+ FIL_ENCRYPTION_OFF
+
+@param[in] key_id Encryption key id
@return crypt object */
UNIV_INTERN
fil_space_crypt_t*
fil_space_create_crypt_data(
-/*========================*/
- fil_encryption_t encrypt_mode, /*!< in: encryption mode */
- uint key_id) /*!< in: encryption key id */
+ fil_encryption_t encrypt_mode,
+ uint key_id)
{
- return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id, 0));
+ return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
}
/******************************************************************
-Merge fil_space_crypt_t object */
+Merge fil_space_crypt_t object
+@param[in,out] dst Destination cryp data
+@param[in] src Source crypt data */
UNIV_INTERN
void
fil_space_merge_crypt_data(
-/*=======================*/
- fil_space_crypt_t* dst,/*!< out: Crypt data */
- const fil_space_crypt_t* src)/*!< in: Crypt data */
+ fil_space_crypt_t* dst,
+ const fil_space_crypt_t* src)
{
mutex_enter(&dst->mutex);
@@ -267,21 +277,22 @@ fil_space_merge_crypt_data(
dst->type = src->type;
dst->min_key_version = src->min_key_version;
dst->keyserver_requests += src->keyserver_requests;
- dst->closing = src->closing;
mutex_exit(&dst->mutex);
}
/******************************************************************
Read crypt data from a page (0)
-@return crypt data from page 0. */
+@param[in] space space_id
+@param[in] page Page 0
+@param[in] offset Offset to crypt data
+@return crypt data from page 0 or NULL. */
UNIV_INTERN
fil_space_crypt_t*
fil_space_read_crypt_data(
-/*======================*/
- ulint space, /*!< in: file space id*/
- const byte* page, /*!< in: page 0 */
- ulint offset) /*!< in: offset */
+ ulint space,
+ const byte* page,
+ ulint offset)
{
if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
/* Crypt data is not stored. */
@@ -294,8 +305,8 @@ fil_space_read_crypt_data(
type == CRYPT_SCHEME_1)) {
ib_logf(IB_LOG_LEVEL_ERROR,
- "Found non sensible crypt scheme: %lu for space %lu "
- " offset: %lu bytes: "
+ "Found non sensible crypt scheme: " ULINTPF " for space " ULINTPF
+ " offset: " ULINTPF " bytes: "
"[ %.2x %.2x %.2x %.2x %.2x %.2x ].",
type, space, offset,
page[offset + 0 + MAGIC_SZ],
@@ -346,43 +357,37 @@ fil_space_read_crypt_data(
}
/******************************************************************
-Free a crypt data object */
+Free a crypt data object
+@param[in,out] crypt_data crypt data to be freed */
UNIV_INTERN
void
fil_space_destroy_crypt_data(
-/*=========================*/
- fil_space_crypt_t **crypt_data) /*!< out: crypt data */
+ fil_space_crypt_t **crypt_data)
{
if (crypt_data != NULL && (*crypt_data) != NULL) {
fil_space_crypt_t* c = *crypt_data;
- c->~fil_space_crypt_struct();
+ c->~fil_space_crypt_t();
mem_free(c);
*crypt_data = NULL;
}
}
/******************************************************************
-Write crypt data to a page (0) */
-static
+Write crypt data to a page (0)
+@param[in,out] page0 Page 0 where to write
+@param[in,out] mtr Minitransaction */
+UNIV_INTERN
void
-fil_space_write_crypt_data_low(
-/*===========================*/
- fil_space_crypt_t* crypt_data, /*<! out: crypt data */
- ulint type, /*<! in: crypt scheme */
- byte* page, /*<! in: page 0 */
- ulint offset, /*<! in: offset */
- ulint maxsize, /*<! in: size of crypt data */
- mtr_t* mtr) /*<! in: minitransaction */
+fil_space_crypt_t::write_page0(
+ byte* page,
+ mtr_t* mtr)
{
- ut_a(offset > 0 && offset < UNIV_PAGE_SIZE);
ulint space_id = mach_read_from_4(
page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
- const uint len = sizeof(crypt_data->iv);
- const uint min_key_version = crypt_data->min_key_version;
- const uint key_id = crypt_data->key_id;
- const fil_encryption_t encryption = crypt_data->encryption;
- crypt_data->page0_offset = offset;
- ut_a(2 + len + 4 + 1 + 4 + MAGIC_SZ < maxsize);
+ const uint len = sizeof(iv);
+ ulint zip_size = fsp_header_get_zip_size(page);
+ const ulint offset = fsp_header_get_crypt_offset(zip_size);
+ page0_offset = offset;
/*
redo log this as bytewise updates to page 0
@@ -392,7 +397,7 @@ fil_space_write_crypt_data_low(
mlog_write_string(page + offset, CRYPT_MAGIC, MAGIC_SZ, mtr);
mlog_write_ulint(page + offset + MAGIC_SZ + 0, type, MLOG_1BYTE, mtr);
mlog_write_ulint(page + offset + MAGIC_SZ + 1, len, MLOG_1BYTE, mtr);
- mlog_write_string(page + offset + MAGIC_SZ + 2, crypt_data->iv, len,
+ mlog_write_string(page + offset + MAGIC_SZ + 2, iv, len,
mtr);
mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len, min_key_version,
MLOG_4BYTES, mtr);
@@ -424,44 +429,61 @@ fil_space_write_crypt_data_low(
log_ptr += 1;
mlog_close(mtr, log_ptr);
- mlog_catenate_string(mtr, crypt_data->iv, len);
+ mlog_catenate_string(mtr, iv, len);
}
}
/******************************************************************
-Write crypt data to a page (0) */
-UNIV_INTERN
-void
-fil_space_write_crypt_data(
-/*=======================*/
- ulint space, /*<! in: file space */
- byte* page, /*<! in: page 0 */
- ulint offset, /*<! in: offset */
- ulint maxsize, /*<! in: size of crypt data */
- mtr_t* mtr) /*<! in: minitransaction */
+Set crypt data for a tablespace
+@param[in,out] space Tablespace
+@param[in,out] crypt_data Crypt data to be set
+@return crypt_data in tablespace */
+static
+fil_space_crypt_t*
+fil_space_set_crypt_data(
+ fil_space_t* space,
+ fil_space_crypt_t* crypt_data)
{
- fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space);
+ fil_space_crypt_t* free_crypt_data = NULL;
+ fil_space_crypt_t* ret_crypt_data = NULL;
+
+ /* Provided space is protected using fil_space_acquire()
+ from concurrent operations. */
+ if (space->crypt_data != NULL) {
+ /* There is already crypt data present,
+ merge new crypt_data */
+ fil_space_merge_crypt_data(space->crypt_data,
+ crypt_data);
+ ret_crypt_data = space->crypt_data;
+ free_crypt_data = crypt_data;
+ } else {
+ space->crypt_data = crypt_data;
+ ret_crypt_data = space->crypt_data;
+ }
- /* If no crypt data is stored on memory cache for this space,
- then do not continue writing crypt data to page 0. */
- if (crypt_data == NULL) {
- return;
+ if (free_crypt_data != NULL) {
+ /* there was already crypt data present and the new crypt
+ * data provided as argument to this function has been merged
+ * into that => free new crypt data
+ */
+ fil_space_destroy_crypt_data(&free_crypt_data);
}
- fil_space_write_crypt_data_low(crypt_data, crypt_data->type,
- page, offset, maxsize, mtr);
+ return ret_crypt_data;
}
/******************************************************************
Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
+@param[in] ptr Log entry start
+@param[in] end_ptr Log entry end
+@param[in] block buffer block
@return position on log buffer */
UNIV_INTERN
-byte*
+const byte*
fil_parse_write_crypt_data(
-/*=======================*/
- byte* ptr, /*!< in: Log entry start */
- byte* end_ptr,/*!< in: Log entry end */
- buf_block_t* block) /*!< in: buffer block */
+ const byte* ptr,
+ const byte* end_ptr,
+ const buf_block_t* block)
{
/* check that redo log entry is complete */
size_t entry_size =
@@ -473,7 +495,7 @@ fil_parse_write_crypt_data(
4 + // size of key_id
1; // fil_encryption_t
- if ((size_t) (end_ptr - ptr) < entry_size){
+ if (ptr + entry_size > end_ptr) {
return NULL;
}
@@ -499,7 +521,7 @@ fil_parse_write_crypt_data(
fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(ptr);
ptr +=1;
- if ((size_t) (end_ptr - ptr) < len) {
+ if (ptr + len > end_ptr) {
return NULL;
}
@@ -512,47 +534,36 @@ fil_parse_write_crypt_data(
ptr += len;
/* update fil_space memory cache with crypt_data */
- fil_space_set_crypt_data(space_id, crypt_data);
+ fil_space_t* space = fil_space_acquire_silent(space_id);
- return ptr;
-}
+ if (space) {
+ crypt_data = fil_space_set_crypt_data(space, crypt_data);
+ fil_space_release(space);
+ }
-/******************************************************************
-Clear crypt data from a page (0) */
-UNIV_INTERN
-void
-fil_space_clear_crypt_data(
-/*=======================*/
- byte* page, /*!< in/out: Page 0 */
- ulint offset) /*!< in: Offset */
-{
- //TODO(jonaso): pass crypt-data and read len from there
- ulint len = CRYPT_SCHEME_1_IV_LEN;
- ulint size =
- sizeof(CRYPT_MAGIC) +
- 1 + // type
- 1 + // len
- len + // iv
- 4 + // min key version
- 4 + // key id
- 1; // fil_encryption_t
- memset(page + offset, 0, size);
+ return ptr;
}
/******************************************************************
-Encrypt a buffer */
+Encrypt a buffer
+@param[in,out] crypt_data Crypt data
+@param[in] space space_id
+@param[in] offset Page offset
+@param[in] lsn Log sequence number
+@param[in] src_frame Page to encrypt
+@param[in] zip_size Compressed size or 0
+@param[in,out] dst_frame Output buffer
+@return encrypted buffer or NULL */
UNIV_INTERN
byte*
fil_encrypt_buf(
-/*============*/
- fil_space_crypt_t* crypt_data, /*!< in: crypt data */
- ulint space, /*!< in: Space id */
- ulint offset, /*!< in: Page offset */
- lsn_t lsn, /*!< in: lsn */
- byte* src_frame, /*!< in: Source page to be encrypted */
- ulint zip_size, /*!< in: compressed size if
- row format compressed */
- byte* dst_frame) /*!< in: outbut buffer */
+ fil_space_crypt_t* crypt_data,
+ ulint space,
+ ulint offset,
+ lsn_t lsn,
+ const byte* src_frame,
+ ulint zip_size,
+ byte* dst_frame)
{
ulint page_size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
uint key_version = fil_crypt_get_latest_key_version(crypt_data);
@@ -625,46 +636,48 @@ fil_encrypt_buf(
// store the post-encryption checksum after the key-version
mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum);
+ ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size, NULL, offset));
+
srv_stats.pages_encrypted.inc();
return dst_frame;
}
/******************************************************************
-Encrypt a page */
+Encrypt a page
+
+@param[in] space Tablespace
+@param[in] offset Page offset
+@param[in] lsn Log sequence number
+@param[in] src_frame Page to encrypt
+@param[in,out] dst_frame Output buffer
+@return encrypted buffer or NULL */
UNIV_INTERN
byte*
fil_space_encrypt(
-/*==============*/
- ulint space, /*!< in: Space id */
- ulint offset, /*!< in: Page offset */
- lsn_t lsn, /*!< in: lsn */
- byte* src_frame, /*!< in: Source page to be encrypted */
- ulint zip_size, /*!< in: compressed size if
- row_format compressed */
- byte* dst_frame) /*!< in: outbut buffer */
+ const fil_space_t* space,
+ ulint offset,
+ lsn_t lsn,
+ byte* src_frame,
+ byte* dst_frame)
{
- fil_space_crypt_t* crypt_data = NULL;
-
ulint orig_page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
if (orig_page_type==FIL_PAGE_TYPE_FSP_HDR
|| orig_page_type==FIL_PAGE_TYPE_XDES) {
/* File space header or extent descriptor do not need to be
encrypted. */
- return src_frame;
+ return (src_frame);
}
- /* Get crypt data from file space */
- crypt_data = fil_space_get_crypt_data(space);
-
- if (crypt_data == NULL) {
- return src_frame;
+ if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
+ return (src_frame);
}
- ut_a(crypt_data != NULL && crypt_data->is_encrypted());
-
- byte* tmp = fil_encrypt_buf(crypt_data, space, offset, lsn, src_frame, zip_size, dst_frame);
+ fil_space_crypt_t* crypt_data = space->crypt_data;
+ ut_ad(space->n_pending_ops);
+ ulint zip_size = fsp_flags_get_zip_size(space->flags);
+ byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn, src_frame, zip_size, dst_frame);
#ifdef UNIV_DEBUG
if (tmp) {
@@ -685,7 +698,7 @@ fil_space_encrypt(
src = uncomp_mem;
}
- bool corrupted1 = buf_page_is_corrupted(true, src, zip_size);
+ bool corrupted1 = buf_page_is_corrupted(true, src, zip_size, space);
bool ok = fil_space_decrypt(crypt_data, tmp_mem, size, tmp, &err);
/* Need to decompress the page if it was also compressed */
@@ -694,18 +707,17 @@ fil_space_encrypt(
fil_decompress_page(tmp_mem, comp_mem, UNIV_PAGE_SIZE, NULL);
}
- bool corrupted = buf_page_is_corrupted(true, tmp_mem, zip_size);
+ bool corrupted = buf_page_is_corrupted(true, tmp_mem, zip_size, space);
bool different = memcmp(src, tmp_mem, size);
if (!ok || corrupted || corrupted1 || err != DB_SUCCESS || different) {
- fprintf(stderr, "JAN: ok %d corrupted %d corrupted1 %d err %d different %d\n", ok , corrupted, corrupted1, err, different);
- fprintf(stderr, "JAN1: src_frame\n");
+ fprintf(stderr, "ok %d corrupted %d corrupted1 %d err %d different %d\n", ok , corrupted, corrupted1, err, different);
+ fprintf(stderr, "src_frame\n");
buf_page_print(src_frame, zip_size, BUF_PAGE_PRINT_NO_CRASH);
- fprintf(stderr, "JAN2: encrypted_frame\n");
+ fprintf(stderr, "encrypted_frame\n");
buf_page_print(tmp, zip_size, BUF_PAGE_PRINT_NO_CRASH);
- fprintf(stderr, "JAN1: decrypted_frame\n");
- buf_page_print(tmp_mem, zip_size, BUF_PAGE_PRINT_NO_CRASH);
- ut_error;
+ fprintf(stderr, "decrypted_frame\n");
+ buf_page_print(tmp_mem, zip_size, 0);
}
free(tmp_mem);
@@ -724,45 +736,22 @@ fil_space_encrypt(
return tmp;
}
-/*********************************************************************
-Check if extra buffer shall be allocated for decrypting after read
-@return true if fil space has encryption data. */
-UNIV_INTERN
-bool
-fil_space_check_encryption_read(
-/*=============================*/
- ulint space) /*!< in: tablespace id */
-{
- fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space);
-
- if (crypt_data == NULL) {
- return false;
- }
-
- if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED) {
- return false;
- }
-
- if (crypt_data->not_encrypted()) {
- return false;
- }
-
- return true;
-}
-
/******************************************************************
Decrypt a page
+@param[in] crypt_data crypt_data
+@param[in] tmp_frame Temporary buffer
+@param[in] page_size Page size
+@param[in,out] src_frame Page to decrypt
+@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
@return true if page decrypted, false if not.*/
UNIV_INTERN
bool
fil_space_decrypt(
-/*==============*/
- fil_space_crypt_t* crypt_data, /*!< in: crypt data */
- byte* tmp_frame, /*!< in: temporary buffer */
- ulint page_size, /*!< in: page size */
- byte* src_frame, /*!< in: out: page buffer */
- dberr_t* err) /*!< in: out: DB_SUCCESS or
- error code */
+ fil_space_crypt_t* crypt_data,
+ byte* tmp_frame,
+ ulint page_size,
+ byte* src_frame,
+ dberr_t* err)
{
ulint page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
uint key_version = mach_read_from_4(src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
@@ -770,6 +759,7 @@ fil_space_decrypt(
ulint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
ulint space = mach_read_from_4(src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
+
*err = DB_SUCCESS;
if (key_version == ENCRYPTION_KEY_NOT_ENCRYPTED) {
@@ -784,12 +774,12 @@ fil_space_decrypt(
first page in a system tablespace
data file (ibdata*, not *.ibd), if not
clear it. */
-#ifdef UNIV_DEBUG
- ib_logf(IB_LOG_LEVEL_WARN,
- "Page on space %lu offset %lu has key_version %u"
+
+ DBUG_PRINT("ib_crypt",
+ ("Page on space %lu offset %lu has key_version %u"
" when it shoud be undefined.",
- space, offset, key_version);
-#endif
+ space, offset, key_version));
+
mach_write_to_4(src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 0);
}
return false;
@@ -858,32 +848,43 @@ fil_space_decrypt(
/******************************************************************
Decrypt a page
-@return encrypted page, or original not encrypted page if encryption is
-not needed. */
+@param[in] space Tablespace
+@param[in] tmp_frame Temporary buffer used for decrypting
+@param[in] page_size Page size
+@param[in,out] src_frame Page to decrypt
+@param[out] decrypted true if page was decrypted
+@return decrypted page, or original not encrypted page if decryption is
+not needed.*/
UNIV_INTERN
byte*
fil_space_decrypt(
-/*==============*/
- ulint space, /*!< in: Fil space id */
- byte* tmp_frame, /*!< in: temporary buffer */
- ulint page_size, /*!< in: page size */
- byte* src_frame) /*!< in/out: page buffer */
+ const fil_space_t* space,
+ byte* tmp_frame,
+ byte* src_frame,
+ bool* decrypted)
{
dberr_t err = DB_SUCCESS;
byte* res = NULL;
+ ulint zip_size = fsp_flags_get_zip_size(space->flags);
+ ulint size = zip_size ? zip_size : UNIV_PAGE_SIZE;
+ *decrypted = false;
+
+ ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
+ ut_ad(space->n_pending_ops > 0);
bool encrypted = fil_space_decrypt(
- fil_space_get_crypt_data(space),
+ space->crypt_data,
tmp_frame,
- page_size,
+ size,
src_frame,
&err);
if (err == DB_SUCCESS) {
if (encrypted) {
+ *decrypted = true;
/* Copy the decrypted page back to page buffer, not
really any other options. */
- memcpy(src_frame, tmp_frame, page_size);
+ memcpy(src_frame, tmp_frame, size);
}
res = src_frame;
@@ -894,14 +895,15 @@ fil_space_decrypt(
/******************************************************************
Calculate post encryption checksum
+@param[in] zip_size zip_size or 0
+@param[in] dst_frame Block where checksum is calculated
@return page checksum or BUF_NO_CHECKSUM_MAGIC
not needed. */
UNIV_INTERN
ulint
fil_crypt_calculate_checksum(
-/*=========================*/
- ulint zip_size, /*!< in: zip_size or 0 */
- byte* dst_frame) /*!< in: page where to calculate */
+ ulint zip_size,
+ const byte* dst_frame)
{
ib_uint32_t checksum = 0;
srv_checksum_algorithm_t algorithm =
@@ -934,83 +936,133 @@ fil_crypt_calculate_checksum(
}
/*********************************************************************
-Verify checksum for a page (iff it's encrypted)
-NOTE: currently this function can only be run in single threaded mode
-as it modifies srv_checksum_algorithm (temporarily)
+Verify that post encryption checksum match calculated checksum.
+This function should be called only if tablespace contains crypt_data
+metadata (this is strong indication that tablespace is encrypted).
+Function also verifies that traditional checksum does not match
+calculated checksum as if it does page could be valid unencrypted,
+encrypted, or corrupted.
+
+@param[in] page Page to verify
+@param[in] zip_size zip size
+@param[in] space Tablespace
+@param[in] pageno Page no
@return true if page is encrypted AND OK, false otherwise */
UNIV_INTERN
bool
fil_space_verify_crypt_checksum(
-/*============================*/
- const byte* src_frame, /*!< in: page the verify */
- ulint zip_size) /*!< in: compressed size if
- row_format compressed */
+ byte* page,
+ ulint zip_size,
+ const fil_space_t* space,
+ ulint pageno)
{
- // key version
- uint key_version = mach_read_from_4(
- src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
+ uint key_version = mach_read_from_4(page+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
+ /* If page is not encrypted, return false */
if (key_version == 0) {
- return false; // unencrypted page
+ return false;
+ }
+
+ srv_checksum_algorithm_t algorithm =
+ static_cast<srv_checksum_algorithm_t>(srv_checksum_algorithm);
+ /* If no checksum is used, can't continue checking. */
+ if (algorithm == SRV_CHECKSUM_ALGORITHM_NONE) {
+ return(true);
}
- /* "trick" the normal checksum routines by storing the post-encryption
- * checksum into the normal checksum field allowing for reuse of
- * the normal routines */
+ /* Read stored post encryption checksum. */
+ ib_uint32_t checksum = mach_read_from_4(
+ page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
- // post encryption checksum
- ib_uint32_t stored_post_encryption = mach_read_from_4(
- src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
+ /* Declare empty pages non-corrupted */
+ if (checksum == 0
+ && *reinterpret_cast<const ib_uint64_t*>(page + FIL_PAGE_LSN) == 0
+ && buf_page_is_zeroes(page, zip_size)) {
+ return(true);
+ }
- // save pre encryption checksum for restore in end of this function
- ib_uint32_t stored_pre_encryption = mach_read_from_4(
- src_frame + FIL_PAGE_SPACE_OR_CHKSUM);
+ /* Compressed and encrypted pages do not have checksum. Assume not
+ corrupted. Page verification happens after decompression in
+ buf_page_io_complete() using buf_page_is_corrupted(). */
+ if (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
+ return (true);
+ }
- ib_uint32_t checksum_field2 = mach_read_from_4(
- src_frame + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM);
+ /* Compressed pages use different checksum method. We first store
+ the post encryption checksum on checksum location and after function
+ restore the original. */
+ if (zip_size) {
+ ib_uint32_t old = static_cast<ib_uint32_t>(mach_read_from_4(
+ page + FIL_PAGE_SPACE_OR_CHKSUM));
- /** prepare frame for usage of normal checksum routines */
- mach_write_to_4(const_cast<byte*>(src_frame) + FIL_PAGE_SPACE_OR_CHKSUM,
- stored_post_encryption);
+ mach_write_to_4(page + FIL_PAGE_SPACE_OR_CHKSUM, checksum);
- /* NOTE: this function is (currently) only run when restoring
- * dblwr-buffer, server is single threaded so it's safe to modify
- * srv_checksum_algorithm */
- srv_checksum_algorithm_t save_checksum_algorithm =
- (srv_checksum_algorithm_t)srv_checksum_algorithm;
+ bool valid = page_zip_verify_checksum(page, zip_size);
- if (zip_size == 0 &&
- (save_checksum_algorithm == SRV_CHECKSUM_ALGORITHM_STRICT_INNODB ||
- save_checksum_algorithm == SRV_CHECKSUM_ALGORITHM_INNODB)) {
- /* handle ALGORITHM_INNODB specially,
- * "downgrade" to ALGORITHM_INNODB and store BUF_NO_CHECKSUM_MAGIC
- * checksum_field2 is sort of pointless anyway...
- */
- srv_checksum_algorithm = SRV_CHECKSUM_ALGORITHM_INNODB;
- mach_write_to_4(const_cast<byte*>(src_frame) +
- UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM,
- BUF_NO_CHECKSUM_MAGIC);
+ mach_write_to_4(page + FIL_PAGE_SPACE_OR_CHKSUM, old);
+
+ return (valid);
}
- /* verify checksums */
- ibool corrupted = buf_page_is_corrupted(false, src_frame, zip_size);
+ /* If stored checksum matches one of the calculated checksums
+ page is not corrupted. */
- /** restore frame & algorithm */
- srv_checksum_algorithm = save_checksum_algorithm;
+ ib_uint32_t cchecksum1 = buf_calc_page_crc32(page);
+ ib_uint32_t cchecksum2 = (ib_uint32_t) buf_calc_page_new_checksum(
+ page);
+ bool encrypted = (checksum == cchecksum1 || checksum == cchecksum2
+ || checksum == BUF_NO_CHECKSUM_MAGIC);
- mach_write_to_4(const_cast<byte*>(src_frame) +
- FIL_PAGE_SPACE_OR_CHKSUM,
- stored_pre_encryption);
+ /* MySQL 5.6 and MariaDB 10.0 and 10.1 will write an LSN to the
+ first page of each system tablespace file at
+ FIL_PAGE_FILE_FLUSH_LSN offset. On other pages and in other files,
+ the field might have been uninitialized until MySQL 5.5. In MySQL 5.7
+ (and MariaDB Server 10.2.2) WL#7990 stopped writing the field for other
+ than page 0 of the system tablespace.
- mach_write_to_4(const_cast<byte*>(src_frame) +
- UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM,
- checksum_field2);
+ Starting from MariaDB 10.1 the field has been repurposed for
+ encryption key_version.
- if (!corrupted) {
- return true; // page was encrypted and checksum matched
- } else {
- return false; // page was encrypted but checksum didn't match
+ Starting with MySQL 5.7 (and MariaDB Server 10.2), the
+ field has been repurposed for SPATIAL INDEX pages for
+ FIL_RTREE_SPLIT_SEQ_NUM.
+
+ Note that FIL_PAGE_FILE_FLUSH_LSN is not included in the InnoDB page
+ checksum.
+
+ Thus, FIL_PAGE_FILE_FLUSH_LSN could contain any value. While the
+ field would usually be 0 for pages that are not encrypted, we cannot
+ assume that a nonzero value means that the page is encrypted.
+ Therefore we must validate the page both as encrypted and unencrypted
+ when FIL_PAGE_FILE_FLUSH_LSN does not contain 0.
+ */
+
+ ulint checksum1 = mach_read_from_4(
+ page + FIL_PAGE_SPACE_OR_CHKSUM);
+
+ ulint checksum2 = mach_read_from_4(
+ page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM);
+
+
+ bool valid = (buf_page_is_checksum_valid_crc32(page,checksum1,checksum2)
+ || buf_page_is_checksum_valid_none(page,checksum1,checksum2)
+ || buf_page_is_checksum_valid_innodb(page,checksum1, checksum2));
+
+ if (encrypted && valid) {
+ /* If page is encrypted and traditional checksums match,
+ page could be still encrypted, or not encrypted and valid or
+ corrupted. */
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ " Page %lu in space %s (%lu) maybe corrupted."
+ " Post encryption checksum %u stored [%lu:%lu] key_version %u",
+ pageno,
+ space ? space->name : "N/A",
+ mach_read_from_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID),
+ checksum, checksum1, checksum2, key_version);
+ encrypted = false;
}
+
+ return(encrypted);
}
/***********************************************************************/
@@ -1029,12 +1081,13 @@ struct key_state_t {
};
/***********************************************************************
-Copy global key state */
+Copy global key state
+@param[in,out] new_state key state
+@param[in] crypt_data crypt data */
static void
fil_crypt_get_key_state(
-/*====================*/
- key_state_t* new_state, /*!< out: key state */
- fil_space_crypt_t* crypt_data) /*!< in, out: crypt_data */
+ key_state_t* new_state,
+ fil_space_crypt_t* crypt_data)
{
if (srv_encrypt_tables) {
new_state->key_version = crypt_data->key_get_latest_version();
@@ -1049,15 +1102,17 @@ fil_crypt_get_key_state(
/***********************************************************************
Check if a key needs rotation given a key_state
+@param[in] encrypt_mode Encryption mode
+@param[in] key_version Current key version
+@param[in] latest_key_version Latest key version
+@param[in] rotate_key_age when to rotate
@return true if key needs rotation, false if not */
static bool
fil_crypt_needs_rotation(
-/*=====================*/
- fil_encryption_t encrypt_mode, /*!< in: Encryption
- mode */
- uint key_version, /*!< in: Key version */
- uint latest_key_version, /*!< in: Latest key version */
- uint rotate_key_age) /*!< in: When to rotate */
+ fil_encryption_t encrypt_mode,
+ uint key_version,
+ uint latest_key_version,
+ uint rotate_key_age)
{
if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
return false;
@@ -1070,7 +1125,7 @@ fil_crypt_needs_rotation(
}
if (latest_key_version == 0 && key_version != 0) {
- if (encrypt_mode == FIL_SPACE_ENCRYPTION_DEFAULT) {
+ if (encrypt_mode == FIL_ENCRYPTION_DEFAULT) {
/* this is rotation encrypted => unencrypted */
return true;
}
@@ -1087,59 +1142,34 @@ fil_crypt_needs_rotation(
}
/***********************************************************************
-Check if a space is closing (i.e just before drop)
-@return true if space is closing, false if not. */
-UNIV_INTERN
-bool
-fil_crypt_is_closing(
-/*=================*/
- ulint space) /*!< in: FIL space id */
-{
- bool closing=true;
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
-
- if (crypt_data) {
- closing = crypt_data->is_closing(false);
- }
-
- return closing;
-}
-
-/***********************************************************************
Start encrypting a space
-@return true if a pending op (fil_inc_pending_ops/fil_decr_pending_ops) is held
-*/
+@param[in,out] space Tablespace
+@return true if a recheck is needed */
static
bool
fil_crypt_start_encrypting_space(
-/*=============================*/
- ulint space, /*!< in: FIL space id */
- bool* recheck)/*!< out: true if recheck needed */
+ fil_space_t* space)
{
-
- /* we have a pending op when entering function */
- bool pending_op = true;
-
+ bool recheck = false;
mutex_enter(&fil_crypt_threads_mutex);
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
- ibool page_encrypted = (crypt_data != NULL);
+ fil_space_crypt_t *crypt_data = space->crypt_data;
- /*If spage is not encrypted and encryption is not enabled, then
+ /* If space is not encrypted and encryption is not enabled, then
do not continue encrypting the space. */
- if (!page_encrypted && !srv_encrypt_tables) {
+ if (!crypt_data && !srv_encrypt_tables) {
mutex_exit(&fil_crypt_threads_mutex);
- return pending_op;
+ return false;
}
if (crypt_data != NULL || fil_crypt_start_converting) {
/* someone beat us to it */
if (fil_crypt_start_converting) {
- *recheck = true;
+ recheck = true;
}
mutex_exit(&fil_crypt_threads_mutex);
- return pending_op;
+ return recheck;
}
/* NOTE: we need to write and flush page 0 before publishing
@@ -1148,10 +1178,11 @@ fil_crypt_start_encrypting_space(
* crypt data in page 0 */
/* 1 - create crypt data */
- crypt_data = fil_space_create_crypt_data(FIL_SPACE_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
+ crypt_data = fil_space_create_crypt_data(FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
+
if (crypt_data == NULL) {
mutex_exit(&fil_crypt_threads_mutex);
- return pending_op;
+ return false;
}
crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
@@ -1169,87 +1200,44 @@ fil_crypt_start_encrypting_space(
do
{
- if (fil_crypt_is_closing(space) ||
- fil_space_found_by_id(space) == NULL) {
- break;
- }
-
mtr_t mtr;
mtr_start(&mtr);
/* 2 - get page 0 */
- ulint offset = 0;
- ulint zip_size = fil_space_get_zip_size(space);
- buf_block_t* block = buf_page_get_gen(space, zip_size, offset,
+ ulint zip_size = fsp_flags_get_zip_size(space->flags);
+ buf_block_t* block = buf_page_get_gen(space->id, zip_size, 0,
RW_X_LATCH,
NULL,
BUF_GET,
__FILE__, __LINE__,
&mtr);
- if (fil_crypt_is_closing(space) ||
- fil_space_found_by_id(space) == NULL) {
- mtr_commit(&mtr);
- break;
- }
- /* 3 - compute location to store crypt data */
+ /* 3 - write crypt data to page 0 */
byte* frame = buf_block_get_frame(block);
- ulint maxsize;
- ut_ad(crypt_data);
- crypt_data->page0_offset =
- fsp_header_get_crypt_offset(zip_size, &maxsize);
-
- /* 4 - write crypt data to page 0 */
- fil_space_write_crypt_data_low(crypt_data,
- CRYPT_SCHEME_1,
- frame,
- crypt_data->page0_offset,
- maxsize, &mtr);
+ crypt_data->type = CRYPT_SCHEME_1;
+ crypt_data->write_page0(frame, &mtr);
- mtr_commit(&mtr);
- if (fil_crypt_is_closing(space) ||
- fil_space_found_by_id(space) == NULL) {
- break;
- }
+ mtr_commit(&mtr);
/* record lsn of update */
lsn_t end_lsn = mtr.end_lsn;
/* 4 - sync tablespace before publishing crypt data */
- /* release "lock" while syncing */
- fil_decr_pending_ops(space);
- pending_op = false;
-
bool success = false;
- ulint n_pages = 0;
ulint sum_pages = 0;
+
do {
+ ulint n_pages = 0;
success = buf_flush_list(ULINT_MAX, end_lsn, &n_pages);
buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
sum_pages += n_pages;
- } while (!success &&
- !fil_crypt_is_closing(space) &&
- !fil_space_found_by_id(space));
-
- /* try to reacquire pending op */
- if (fil_inc_pending_ops(space, true)) {
- break;
- }
-
- /* pending op reacquired! */
- pending_op = true;
-
- if (fil_crypt_is_closing(space) ||
- fil_space_found_by_id(space) == NULL) {
- break;
- }
+ } while (!success);
/* 5 - publish crypt data */
mutex_enter(&fil_crypt_threads_mutex);
- ut_ad(crypt_data);
mutex_enter(&crypt_data->mutex);
crypt_data->type = CRYPT_SCHEME_1;
ut_a(crypt_data->rotate_state.active_threads == 1);
@@ -1260,10 +1248,9 @@ fil_crypt_start_encrypting_space(
mutex_exit(&crypt_data->mutex);
mutex_exit(&fil_crypt_threads_mutex);
- return pending_op;
+ return recheck;
} while (0);
- ut_ad(crypt_data);
mutex_enter(&crypt_data->mutex);
ut_a(crypt_data->rotate_state.active_threads == 1);
crypt_data->rotate_state.active_threads = 0;
@@ -1273,7 +1260,7 @@ fil_crypt_start_encrypting_space(
fil_crypt_start_converting = false;
mutex_exit(&fil_crypt_threads_mutex);
- return pending_op;
+ return recheck;
}
/** State of a rotation thread */
@@ -1287,7 +1274,7 @@ struct rotate_thread_t {
uint thread_no;
bool first; /*!< is position before first space */
- ulint space; /*!< current space */
+ fil_space_t* space; /*!< current space or NULL */
ulint offset; /*!< current offset */
ulint batch; /*!< #pages to rotate */
uint min_key_version_found;/*!< min key version found but not rotated */
@@ -1322,54 +1309,41 @@ struct rotate_thread_t {
/***********************************************************************
Check if space needs rotation given a key_state
+@param[in,out] state Key rotation state
+@param[in,out] key_state Key state
+@param[in,out] recheck needs recheck ?
@return true if space needs key rotation */
static
bool
fil_crypt_space_needs_rotation(
-/*===========================*/
- rotate_thread_t* state, /*!< in: Key rotation state */
- key_state_t* key_state, /*!< in: Key state */
- bool* recheck) /*!< out: needs recheck ? */
+ rotate_thread_t* state,
+ key_state_t* key_state,
+ bool* recheck)
{
- ulint space = state->space;
-
- /* Make sure that tablespace is found and it is normal tablespace */
- if (fil_space_found_by_id(space) == NULL ||
- fil_space_get_type(space) != FIL_TABLESPACE) {
- return false;
- }
+ fil_space_t* space = state->space;
- if (fil_inc_pending_ops(space, true)) {
- /* tablespace being dropped */
+ /* Make sure that tablespace is normal tablespace */
+ if (space->purpose != FIL_TABLESPACE) {
return false;
}
- /* keep track of if we have pending op */
- bool pending_op = true;
+ ut_ad(space->n_pending_ops > 0);
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
+ fil_space_crypt_t *crypt_data = space->crypt_data;
if (crypt_data == NULL) {
/**
* space has no crypt data
* start encrypting it...
*/
- pending_op = fil_crypt_start_encrypting_space(space, recheck);
-
- crypt_data = fil_space_get_crypt_data(space);
+ *recheck = fil_crypt_start_encrypting_space(space);
+ crypt_data = space->crypt_data;
if (crypt_data == NULL) {
- if (pending_op) {
- fil_decr_pending_ops(space);
- }
return false;
}
crypt_data->key_get_latest_version();
-
- if (!crypt_data->is_key_found()) {
- return false;
- }
}
/* If used key_id is not found from encryption plugin we can't
@@ -1389,7 +1363,7 @@ fil_crypt_space_needs_rotation(
}
/* prevent threads from starting to rotate space */
- if (crypt_data->is_closing(true)) {
+ if (space->is_stopping()) {
break;
}
@@ -1413,39 +1387,39 @@ fil_crypt_space_needs_rotation(
key_state->key_version, key_state->rotate_key_age);
crypt_data->rotate_state.scrubbing.is_active =
- btr_scrub_start_space(space, &state->scrub_data);
+ btr_scrub_start_space(space->id, &state->scrub_data);
time_t diff = time(0) - crypt_data->rotate_state.scrubbing.
last_scrub_completed;
bool need_scrubbing =
+ (srv_background_scrub_data_uncompressed ||
+ srv_background_scrub_data_compressed) &&
crypt_data->rotate_state.scrubbing.is_active
- && diff >= (time_t) srv_background_scrub_data_interval;
+ && diff >= 0
+ && ulint(diff) >= srv_background_scrub_data_interval;
if (need_key_rotation == false && need_scrubbing == false) {
break;
}
mutex_exit(&crypt_data->mutex);
- /* NOTE! fil_decr_pending_ops is performed outside */
+
return true;
} while (0);
mutex_exit(&crypt_data->mutex);
- if (pending_op) {
- fil_decr_pending_ops(space);
- }
return false;
}
/***********************************************************************
-Update global statistics with thread statistics */
+Update global statistics with thread statistics
+@param[in,out] state key rotation statistics */
static void
fil_crypt_update_total_stat(
-/*========================*/
- rotate_thread_t *state) /*!< in: Key rotation status */
+ rotate_thread_t *state)
{
mutex_enter(&crypt_stat_mutex);
crypt_stat.pages_read_from_cache +=
@@ -1469,15 +1443,19 @@ fil_crypt_update_total_stat(
/***********************************************************************
Allocate iops to thread from global setting,
used before starting to rotate a space.
+@param[in,out] state Rotation state
@return true if allocation succeeded, false if failed */
static
bool
fil_crypt_alloc_iops(
-/*=================*/
- rotate_thread_t *state) /*!< in: Key rotation status */
+ rotate_thread_t *state)
{
ut_ad(state->allocated_iops == 0);
+ /* We have not yet selected the space to rotate, thus
+ state might not contain space and we can't check
+ its status yet. */
+
uint max_iops = state->estimated_max_iops;
mutex_enter(&fil_crypt_threads_mutex);
@@ -1503,12 +1481,12 @@ fil_crypt_alloc_iops(
/***********************************************************************
Reallocate iops to thread,
-used when inside a space */
+used when inside a space
+@param[in,out] state Rotation state */
static
void
fil_crypt_realloc_iops(
-/*===================*/
- rotate_thread_t *state) /*!< in: Key rotation status */
+ rotate_thread_t *state)
{
ut_a(state->allocated_iops > 0);
@@ -1517,13 +1495,12 @@ fil_crypt_realloc_iops(
uint avg_wait_time_us =
state->sum_waited_us / state->cnt_waited;
-#if DEBUG_KEYROTATION_THROTTLING
- ib_logf(IB_LOG_LEVEL_INFO,
- "thr_no: %u - update estimated_max_iops from %u to %u.",
+ DBUG_PRINT("ib_crypt",
+ ("thr_no: %u - update estimated_max_iops from %u to %u.",
state->thread_no,
state->estimated_max_iops,
- 1000000 / avg_wait_time_us);
-#endif
+ 1000000 / avg_wait_time_us));
+
if (avg_wait_time_us == 0) {
avg_wait_time_us = 1; // prevent division by zero
}
@@ -1532,12 +1509,11 @@ fil_crypt_realloc_iops(
state->cnt_waited = 0;
state->sum_waited_us = 0;
} else {
-#if DEBUG_KEYROTATION_THROTTLING
- ib_logf(IB_LOG_LEVEL_INFO,
- "thr_no: %u only waited %lu%% skip re-estimate.",
+
+ DBUG_PRINT("ib_crypt",
+ ("thr_no: %u only waited %lu%% skip re-estimate.",
state->thread_no,
- (100 * state->cnt_waited) / state->batch);
-#endif
+ (100 * state->cnt_waited) / state->batch));
}
if (state->estimated_max_iops <= state->allocated_iops) {
@@ -1563,8 +1539,9 @@ fil_crypt_realloc_iops(
state->allocated_iops ++;
n_fil_crypt_iops_allocated ++;
}
- mutex_exit(&fil_crypt_threads_mutex);
+
os_event_set(fil_crypt_threads_event);
+ mutex_exit(&fil_crypt_threads_mutex);
}
} else {
/* see if there are more to get */
@@ -1581,13 +1558,13 @@ fil_crypt_realloc_iops(
}
n_fil_crypt_iops_allocated += extra;
state->allocated_iops += extra;
-#if DEBUG_KEYROTATION_THROTTLING
- ib_logf(IB_LOG_LEVEL_INFO,
- "thr_no: %u increased iops from %u to %u.",
+
+ DBUG_PRINT("ib_crypt",
+ ("thr_no: %u increased iops from %u to %u.",
state->thread_no,
state->allocated_iops - extra,
- state->allocated_iops);
-#endif
+ state->allocated_iops));
+
}
mutex_exit(&fil_crypt_threads_mutex);
}
@@ -1596,12 +1573,12 @@ fil_crypt_realloc_iops(
}
/***********************************************************************
-Return allocated iops to global */
+Return allocated iops to global
+@param[in,out] state Rotation state */
static
void
fil_crypt_return_iops(
-/*==================*/
- rotate_thread_t *state) /*!< in: Key rotation status */
+ rotate_thread_t *state)
{
if (state->allocated_iops > 0) {
uint iops = state->allocated_iops;
@@ -1614,25 +1591,27 @@ fil_crypt_return_iops(
ut_ad(0);
iops = 0;
}
+
n_fil_crypt_iops_allocated -= iops;
- mutex_exit(&fil_crypt_threads_mutex);
state->allocated_iops = 0;
os_event_set(fil_crypt_threads_event);
+ mutex_exit(&fil_crypt_threads_mutex);
}
fil_crypt_update_total_stat(state);
}
/***********************************************************************
-Search for a space needing rotation */
-UNIV_INTERN
+Search for a space needing rotation
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state
+@param[in,out] recheck recheck ? */
+static
bool
fil_crypt_find_space_to_rotate(
-/*===========================*/
- key_state_t* key_state, /*!< in: Key state */
- rotate_thread_t* state, /*!< in: Key rotation state */
- bool* recheck) /*!< out: true if recheck
- needed */
+ key_state_t* key_state,
+ rotate_thread_t* state,
+ bool* recheck)
{
/* we need iops to start rotating */
while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
@@ -1641,30 +1620,44 @@ fil_crypt_find_space_to_rotate(
}
if (state->should_shutdown()) {
+ if (state->space) {
+ fil_space_release(state->space);
+ state->space = NULL;
+ }
return false;
}
if (state->first) {
state->first = false;
- state->space = fil_get_first_space_safe();
- } else {
- state->space = fil_get_next_space_safe(state->space);
+ if (state->space) {
+ fil_space_release(state->space);
+ }
+ state->space = NULL;
}
- while (!state->should_shutdown() && state->space != ULINT_UNDEFINED) {
- fil_space_t* space = fil_space_found_by_id(state->space);
+ /* If key rotation is enabled (default) we iterate all tablespaces.
+ If key rotation is not enabled we iterate only the tablespaces
+ added to keyrotation list. */
+ if (srv_fil_crypt_rotate_key_age) {
+ state->space = fil_space_next(state->space);
+ } else {
+ state->space = fil_space_keyrotate_next(state->space);
+ }
- if (space) {
- if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
- ut_ad(key_state->key_id);
- /* init state->min_key_version_found before
- * starting on a space */
- state->min_key_version_found = key_state->key_version;
- return true;
- }
+ while (!state->should_shutdown() && state->space) {
+ if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
+ ut_ad(key_state->key_id);
+ /* init state->min_key_version_found before
+ * starting on a space */
+ state->min_key_version_found = key_state->key_version;
+ return true;
}
- state->space = fil_get_next_space_safe(state->space);
+ if (srv_fil_crypt_rotate_key_age) {
+ state->space = fil_space_next(state->space);
+ } else {
+ state->space = fil_space_keyrotate_next(state->space);
+ }
}
/* if we didn't find any space return iops */
@@ -1675,16 +1668,16 @@ fil_crypt_find_space_to_rotate(
}
/***********************************************************************
-Start rotating a space */
+Start rotating a space
+@param[in] key_state Key state
+@param[in,out] state Rotation state */
static
void
fil_crypt_start_rotate_space(
-/*=========================*/
- const key_state_t* key_state, /*!< in: Key state */
- rotate_thread_t* state) /*!< in: Key rotation state */
+ const key_state_t* key_state,
+ rotate_thread_t* state)
{
- ulint space = state->space;
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
+ fil_space_crypt_t *crypt_data = state->space->crypt_data;
ut_ad(crypt_data);
mutex_enter(&crypt_data->mutex);
@@ -1695,8 +1688,9 @@ fil_crypt_start_rotate_space(
crypt_data->rotate_state.next_offset = 1; // skip page 0
/* no need to rotate beyond current max
* if space extends, it will be encrypted with newer version */
- crypt_data->rotate_state.max_offset = fil_space_get_size(space);
-
+ /* FIXME: max_offset could be removed and instead
+ space->size consulted.*/
+ crypt_data->rotate_state.max_offset = state->space->size;
crypt_data->rotate_state.end_lsn = 0;
crypt_data->rotate_state.min_key_version_found =
key_state->key_version;
@@ -1724,26 +1718,34 @@ fil_crypt_start_rotate_space(
/***********************************************************************
Search for batch of pages needing rotation
+@param[in] key_state Key state
+@param[in,out] state Rotation state
@return true if page needing key rotation found, false if not found */
static
bool
fil_crypt_find_page_to_rotate(
-/*==========================*/
- const key_state_t* key_state, /*!< in: Key state */
- rotate_thread_t* state) /*!< in: Key rotation state */
+ const key_state_t* key_state,
+ rotate_thread_t* state)
{
ulint batch = srv_alloc_time * state->allocated_iops;
- ulint space = state->space;
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
+ fil_space_t* space = state->space;
+
+ ut_ad(!space || space->n_pending_ops > 0);
+
+ /* If space is marked to be dropped stop rotation. */
+ if (!space || space->is_stopping()) {
+ return false;
+ }
+
+ fil_space_crypt_t *crypt_data = space->crypt_data;
/* Space might already be dropped */
if (crypt_data) {
mutex_enter(&crypt_data->mutex);
ut_ad(key_state->key_id == crypt_data->key_id);
- if (!crypt_data->is_closing(true) &&
- crypt_data->rotate_state.next_offset <
- crypt_data->rotate_state.max_offset) {
+ if (crypt_data->rotate_state.next_offset <
+ crypt_data->rotate_state.max_offset) {
state->offset = crypt_data->rotate_state.next_offset;
ulint remaining = crypt_data->rotate_state.max_offset -
@@ -1768,59 +1770,47 @@ fil_crypt_find_page_to_rotate(
/***********************************************************************
Check if a page is uninitialized (doesn't need to be rotated)
-@return true if page is uninitialized, false if not.*/
-static
+@param[in] frame Page to check
+@param[in] zip_size zip_size or 0
+@return true if page is uninitialized, false if not. */
+static inline
bool
fil_crypt_is_page_uninitialized(
-/*============================*/
- const byte *frame, /*!< in: Page */
- uint zip_size) /*!< in: compressed size if
- row_format compressed */
+ const byte *frame,
+ uint zip_size)
{
- if (zip_size) {
- ulint stored_checksum = mach_read_from_4(
- frame + FIL_PAGE_SPACE_OR_CHKSUM);
- /* empty pages aren't encrypted */
- if (stored_checksum == 0) {
- return true;
- }
- } else {
- ulint size = UNIV_PAGE_SIZE;
- ulint checksum_field1 = mach_read_from_4(
- frame + FIL_PAGE_SPACE_OR_CHKSUM);
- ulint checksum_field2 = mach_read_from_4(
- frame + size - FIL_PAGE_END_LSN_OLD_CHKSUM);
- /* empty pages are not encrypted */
- if (checksum_field1 == 0 && checksum_field2 == 0
- && mach_read_from_4(frame + FIL_PAGE_LSN) == 0) {
- return true;
- }
- }
- return false;
+ return (buf_page_is_zeroes(frame, zip_size));
}
-#define fil_crypt_get_page_throttle(state,space,zip_size,offset,mtr,sleeptime_ms) \
- fil_crypt_get_page_throttle_func(state, space, zip_size, offset, mtr, \
+#define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
+ fil_crypt_get_page_throttle_func(state, offset, mtr, \
sleeptime_ms, __FILE__, __LINE__)
/***********************************************************************
Get a page and compute sleep time
-@return page */
+@param[in,out] state Rotation state
+@param[in] zip_size compressed size or 0
+@param[in] offset Page offset
+@param[in,out] mtr Minitransaction
+@param[out] sleeptime_ms Sleep time
+@param[in] file File where called
+@param[in] line Line where called
+@return page or NULL*/
static
buf_block_t*
fil_crypt_get_page_throttle_func(
-/*=============================*/
- rotate_thread_t* state, /*!< in/out: Key rotation state */
- ulint space, /*!< in: FIL space id */
- uint zip_size, /*!< in: compressed size if
- row_format compressed */
- ulint offset, /*!< in: page offsett */
- mtr_t* mtr, /*!< in/out: minitransaction */
- ulint* sleeptime_ms, /*!< out: sleep time */
- const char* file, /*!< in: file name */
- ulint line) /*!< in: file line */
+ rotate_thread_t* state,
+ ulint offset,
+ mtr_t* mtr,
+ ulint* sleeptime_ms,
+ const char* file,
+ ulint line)
{
- buf_block_t* block = buf_page_try_get_func(space, offset, RW_X_LATCH,
+ fil_space_t* space = state->space;
+ ulint zip_size = fsp_flags_get_zip_size(space->flags);
+ ut_ad(space->n_pending_ops > 0);
+
+ buf_block_t* block = buf_page_try_get_func(space->id, offset, RW_X_LATCH,
true,
file, line, mtr);
if (block != NULL) {
@@ -1831,16 +1821,14 @@ fil_crypt_get_page_throttle_func(
/* Before reading from tablespace we need to make sure that
tablespace exists and is not is just being dropped. */
-
- if (fil_crypt_is_closing(space) ||
- fil_space_found_by_id(space) == NULL) {
+ if (space->is_stopping()) {
return NULL;
}
state->crypt_stat.pages_read_from_disk++;
ullint start = ut_time_us(NULL);
- block = buf_page_get_gen(space, zip_size, offset,
+ block = buf_page_get_gen(space->id, zip_size, offset,
RW_X_LATCH,
NULL, BUF_GET_POSSIBLY_FREED,
file, line, mtr);
@@ -1866,6 +1854,7 @@ fil_crypt_get_page_throttle_func(
}
*sleeptime_ms += add_sleeptime_ms;
+
return block;
}
@@ -1875,27 +1864,35 @@ Get block and allocation status
note: innodb locks fil_space_latch and then block when allocating page
but locks block and then fil_space_latch when freeing page.
-@return block
+
+@param[in,out] state Rotation state
+@param[in] zip_size Compressed size or 0
+@param[in] offset Page offset
+@param[in,out] mtr Minitransaction
+@param[out] allocation_status Allocation status
+@param[out] sleeptime_ms Sleep time
+@return block or NULL
*/
static
buf_block_t*
btr_scrub_get_block_and_allocation_status(
-/*======================================*/
- rotate_thread_t* state, /*!< in/out: Key rotation state */
- ulint space, /*!< in: FIL space id */
- uint zip_size, /*!< in: compressed size if
- row_format compressed */
- ulint offset, /*!< in: page offsett */
- mtr_t* mtr, /*!< in/out: minitransaction
- */
+ rotate_thread_t* state,
+ uint zip_size,
+ ulint offset,
+ mtr_t* mtr,
btr_scrub_page_allocation_status_t *allocation_status,
- /*!< in/out: allocation status */
- ulint* sleeptime_ms) /*!< out: sleep time */
+ ulint* sleeptime_ms)
{
mtr_t local_mtr;
buf_block_t *block = NULL;
+ fil_space_t* space = state->space;
+
+ ut_ad(space->n_pending_ops > 0);
+ ut_ad(zip_size == fsp_flags_get_zip_size(space->flags));
+
mtr_start(&local_mtr);
- *allocation_status = fsp_page_is_free(space, offset, &local_mtr) ?
+
+ *allocation_status = fsp_page_is_free(space->id, offset, &local_mtr) ?
BTR_SCRUB_PAGE_FREE :
BTR_SCRUB_PAGE_ALLOCATED;
@@ -1903,7 +1900,6 @@ btr_scrub_get_block_and_allocation_status(
/* this is easy case, we lock fil_space_latch first and
then block */
block = fil_crypt_get_page_throttle(state,
- space, zip_size,
offset, mtr,
sleeptime_ms);
mtr_commit(&local_mtr);
@@ -1920,7 +1916,6 @@ btr_scrub_get_block_and_allocation_status(
*/
block = fil_crypt_get_page_throttle(state,
- space, zip_size,
offset, mtr,
sleeptime_ms);
}
@@ -1930,21 +1925,29 @@ btr_scrub_get_block_and_allocation_status(
/***********************************************************************
-Rotate one page */
+Rotate one page
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state */
static
void
fil_crypt_rotate_page(
-/*==================*/
- const key_state_t* key_state, /*!< in: Key state */
- rotate_thread_t* state) /*!< in: Key rotation state */
+ const key_state_t* key_state,
+ rotate_thread_t* state)
{
- ulint space = state->space;
+ fil_space_t*space = state->space;
+ ulint space_id = space->id;
ulint offset = state->offset;
- const uint zip_size = fil_space_get_zip_size(space);
+ const uint zip_size = fsp_flags_get_zip_size(space->flags);
ulint sleeptime_ms = 0;
+ fil_space_crypt_t *crypt_data = space->crypt_data;
- /* check if tablespace is closing before reading page */
- if (fil_crypt_is_closing(space) || fil_space_found_by_id(space) == NULL) {
+ ut_ad(space->n_pending_ops > 0);
+
+ /* In fil_crypt_thread where key rotation is done we have
+ acquired space and checked that this space is not yet
+ marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
+ Check here also to give DROP TABLE or similar a change. */
+ if (space->is_stopping()) {
return;
}
@@ -1956,7 +1959,6 @@ fil_crypt_rotate_page(
mtr_t mtr;
mtr_start(&mtr);
buf_block_t* block = fil_crypt_get_page_throttle(state,
- space, zip_size,
offset, &mtr,
&sleeptime_ms);
@@ -1968,9 +1970,8 @@ fil_crypt_rotate_page(
uint kv = block->page.key_version;
/* check if tablespace is closing after reading page */
- if (!fil_crypt_is_closing(space)) {
+ if (space->is_stopping()) {
byte* frame = buf_block_get_frame(block);
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
if (kv == 0 &&
fil_crypt_is_page_uninitialized(frame, zip_size)) {
@@ -1990,7 +1991,7 @@ fil_crypt_rotate_page(
/* force rotation by dummy updating page */
mlog_write_ulint(frame +
FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
- space, MLOG_4BYTES, &mtr);
+ space_id, MLOG_4BYTES, &mtr);
/* update block */
block->page.key_version = key_state->key_version;
@@ -2023,7 +2024,7 @@ fil_crypt_rotate_page(
*/
btr_scrub_page_allocation_status_t allocated;
block = btr_scrub_get_block_and_allocation_status(
- state, space, zip_size, offset, &mtr,
+ state, zip_size, offset, &mtr,
&allocated,
&sleeptime_ms);
@@ -2037,7 +2038,7 @@ fil_crypt_rotate_page(
/* we need to refetch it once more now that we have
* index locked */
block = btr_scrub_get_block_and_allocation_status(
- state, space, zip_size, offset, &mtr,
+ state, zip_size, offset, &mtr,
&allocated,
&sleeptime_ms);
@@ -2068,7 +2069,6 @@ fil_crypt_rotate_page(
if (needs_scrubbing == BTR_SCRUB_TURNED_OFF) {
/* if we just detected that scrubbing was turned off
* update global state to reflect this */
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
ut_ad(crypt_data);
mutex_enter(&crypt_data->mutex);
crypt_data->rotate_state.scrubbing.is_active = false;
@@ -2096,17 +2096,20 @@ fil_crypt_rotate_page(
}
/***********************************************************************
-Rotate a batch of pages */
+Rotate a batch of pages
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state */
static
void
fil_crypt_rotate_pages(
-/*===================*/
- const key_state_t* key_state, /*!< in: Key state */
- rotate_thread_t* state) /*!< in: Key rotation state */
+ const key_state_t* key_state,
+ rotate_thread_t* state)
{
- ulint space = state->space;
+ ulint space = state->space->id;
ulint end = state->offset + state->batch;
+ ut_ad(state->space->n_pending_ops > 0);
+
for (; state->offset < end; state->offset++) {
/* we can't rotate pages in dblwr buffer as
@@ -2127,20 +2130,23 @@ fil_crypt_rotate_pages(
}
/***********************************************************************
-Flush rotated pages and then update page 0 */
+Flush rotated pages and then update page 0
+
+@param[in,out] state rotation state */
static
void
fil_crypt_flush_space(
-/*==================*/
- rotate_thread_t* state, /*!< in: Key rotation state */
- ulint space) /*!< in: FIL space id */
+ rotate_thread_t* state)
{
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
+ fil_space_t* space = state->space;
+ fil_space_crypt_t *crypt_data = space->crypt_data;
+
+ ut_ad(space->n_pending_ops > 0);
/* flush tablespace pages so that there are no pages left with old key */
lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
- if (end_lsn > 0 && !fil_crypt_is_closing(space)) {
+ if (end_lsn > 0 && !space->is_stopping()) {
bool success = false;
ulint n_pages = 0;
ulint sum_pages = 0;
@@ -2150,7 +2156,7 @@ fil_crypt_flush_space(
success = buf_flush_list(ULINT_MAX, end_lsn, &n_pages);
buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
sum_pages += n_pages;
- } while (!success && !fil_crypt_is_closing(space));
+ } while (!success && !space->is_stopping());
ullint end = ut_time_us(NULL);
@@ -2168,40 +2174,38 @@ fil_crypt_flush_space(
}
/* update page 0 */
- if (!fil_crypt_is_closing(space)) {
- mtr_t mtr;
- mtr_start(&mtr);
- ulint offset = 0; // page 0
- const uint zip_size = fil_space_get_zip_size(space);
- buf_block_t* block = buf_page_get_gen(space, zip_size, offset,
- RW_X_LATCH, NULL, BUF_GET,
- __FILE__, __LINE__, &mtr);
- byte* frame = buf_block_get_frame(block);
- ulint maxsize;
- crypt_data->page0_offset =
- fsp_header_get_crypt_offset(zip_size, &maxsize);
+ mtr_t mtr;
+ mtr_start(&mtr);
- fil_space_write_crypt_data(space, frame,
- crypt_data->page0_offset,
- ULINT_MAX, &mtr);
- mtr_commit(&mtr);
- }
+ const uint zip_size = fsp_flags_get_zip_size(state->space->flags);
+
+ buf_block_t* block = buf_page_get_gen(space->id, zip_size, 0,
+ RW_X_LATCH, NULL, BUF_GET,
+ __FILE__, __LINE__, &mtr);
+ byte* frame = buf_block_get_frame(block);
+
+ crypt_data->write_page0(frame, &mtr);
+
+ mtr_commit(&mtr);
}
/***********************************************************************
-Complete rotating a space */
+Complete rotating a space
+@param[in,out] key_state Key state
+@param[in,out] state Rotation state */
static
void
fil_crypt_complete_rotate_space(
-/*============================*/
- const key_state_t* key_state, /*!< in: Key state */
- rotate_thread_t* state) /*!< in: Key rotation state */
+ const key_state_t* key_state,
+ rotate_thread_t* state)
{
- ulint space = state->space;
- fil_space_crypt_t *crypt_data = fil_space_get_crypt_data(space);
+ fil_space_crypt_t *crypt_data = state->space->crypt_data;
+
+ ut_ad(crypt_data);
+ ut_ad(state->space->n_pending_ops > 0);
/* Space might already be dropped */
- if (crypt_data != NULL && !crypt_data->is_closing(false)) {
+ if (!state->space->is_stopping()) {
mutex_enter(&crypt_data->mutex);
/**
@@ -2259,9 +2263,8 @@ fil_crypt_complete_rotate_space(
}
if (should_flush) {
- fil_crypt_flush_space(state, space);
+ fil_crypt_flush_space(state);
- ut_ad(crypt_data);
mutex_enter(&crypt_data->mutex);
crypt_data->rotate_state.flushing = false;
mutex_exit(&crypt_data->mutex);
@@ -2284,8 +2287,8 @@ DECLARE_THREAD(fil_crypt_thread)(
mutex_enter(&fil_crypt_threads_mutex);
uint thread_no = srv_n_fil_crypt_threads_started;
srv_n_fil_crypt_threads_started++;
- mutex_exit(&fil_crypt_threads_mutex);
os_event_set(fil_crypt_event); /* signal that we started */
+ mutex_exit(&fil_crypt_threads_mutex);
/* state of this thread */
rotate_thread_t thr(thread_no);
@@ -2305,6 +2308,7 @@ DECLARE_THREAD(fil_crypt_thread)(
* i.e either new key version of change or
* new rotate_key_age */
os_event_reset(fil_crypt_threads_event);
+
if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
break;
}
@@ -2318,7 +2322,12 @@ DECLARE_THREAD(fil_crypt_thread)(
time_t waited = time(0) - wait_start;
- if (waited >= (time_t) srv_background_scrub_data_check_interval) {
+ /* Break if we have waited the background scrub
+ internal and background scrubbing is enabled */
+ if (waited >= 0
+ && ulint(waited) >= srv_background_scrub_data_check_interval
+ && (srv_background_scrub_data_uncompressed
+ || srv_background_scrub_data_compressed)) {
break;
}
}
@@ -2333,29 +2342,32 @@ DECLARE_THREAD(fil_crypt_thread)(
/* we found a space to rotate */
fil_crypt_start_rotate_space(&new_state, &thr);
- /* decrement pending ops that was incremented in
- * fil_crypt_space_needs_rotation
- * (called from fil_crypt_find_space_to_rotate),
- * this makes sure that tablespace won't be dropped
- * just after we decided to start processing it. */
- fil_decr_pending_ops(thr.space);
-
/* iterate all pages (cooperativly with other threads) */
- while (!thr.should_shutdown() &&
+ while (!thr.should_shutdown() && thr.space &&
fil_crypt_find_page_to_rotate(&new_state, &thr)) {
/* rotate a (set) of pages */
fil_crypt_rotate_pages(&new_state, &thr);
+ /* If space is marked as stopping, release
+ space and stop rotation. */
+ if (thr.space->is_stopping()) {
+ fil_space_release(thr.space);
+ thr.space = NULL;
+ break;
+ }
+
/* realloc iops */
fil_crypt_realloc_iops(&thr);
}
/* complete rotation */
- fil_crypt_complete_rotate_space(&new_state, &thr);
+ if (thr.space) {
+ fil_crypt_complete_rotate_space(&new_state, &thr);
+ }
/* force key state refresh */
- new_state.key_id= 0;
+ new_state.key_id = 0;
/* return iops */
fil_crypt_return_iops(&thr);
@@ -2365,10 +2377,16 @@ DECLARE_THREAD(fil_crypt_thread)(
/* return iops if shutting down */
fil_crypt_return_iops(&thr);
+ /* release current space if shutting down */
+ if (thr.space) {
+ fil_space_release(thr.space);
+ thr.space = NULL;
+ }
+
mutex_enter(&fil_crypt_threads_mutex);
srv_n_fil_crypt_threads_started--;
- mutex_exit(&fil_crypt_threads_mutex);
os_event_set(fil_crypt_event); /* signal that we stopped */
+ mutex_exit(&fil_crypt_threads_mutex);
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
@@ -2379,23 +2397,26 @@ DECLARE_THREAD(fil_crypt_thread)(
}
/*********************************************************************
-Adjust thread count for key rotation */
+Adjust thread count for key rotation
+@param[in] enw_cnt Number of threads to be used */
UNIV_INTERN
void
fil_crypt_set_thread_cnt(
-/*=====================*/
- uint new_cnt) /*!< in: New key rotation thread count */
+ const uint new_cnt)
{
if (!fil_crypt_threads_inited) {
fil_crypt_threads_init();
}
+ mutex_enter(&fil_crypt_threads_mutex);
+
if (new_cnt > srv_n_fil_crypt_threads) {
uint add = new_cnt - srv_n_fil_crypt_threads;
srv_n_fil_crypt_threads = new_cnt;
for (uint i = 0; i < add; i++) {
os_thread_id_t rotation_thread_id;
os_thread_create(fil_crypt_thread, NULL, &rotation_thread_id);
+
ib_logf(IB_LOG_LEVEL_INFO,
"Creating #%d thread id %lu total threads %u.",
i+1, os_thread_pf(rotation_thread_id), new_cnt);
@@ -2405,6 +2426,8 @@ fil_crypt_set_thread_cnt(
os_event_set(fil_crypt_threads_event);
}
+ mutex_exit(&fil_crypt_threads_mutex);
+
while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
os_event_reset(fil_crypt_event);
os_event_wait_time(fil_crypt_event, 1000000);
@@ -2412,39 +2435,39 @@ fil_crypt_set_thread_cnt(
}
/*********************************************************************
-Adjust max key age */
+Adjust max key age
+@param[in] val New max key age */
UNIV_INTERN
void
fil_crypt_set_rotate_key_age(
-/*=========================*/
- uint val) /*!< in: New max key age */
+ uint val)
{
srv_fil_crypt_rotate_key_age = val;
os_event_set(fil_crypt_threads_event);
}
/*********************************************************************
-Adjust rotation iops */
+Adjust rotation iops
+@param[in] val New max roation iops */
UNIV_INTERN
void
fil_crypt_set_rotation_iops(
-/*========================*/
- uint val) /*!< in: New iops setting */
+ uint val)
{
srv_n_fil_crypt_iops = val;
os_event_set(fil_crypt_threads_event);
}
/*********************************************************************
-Adjust encrypt tables */
+Adjust encrypt tables
+@param[in] val New setting for innodb-encrypt-tables */
UNIV_INTERN
void
fil_crypt_set_encrypt_tables(
-/*=========================*/
- uint val) /*!< in: New srv_encrypt_tables setting */
+ uint val)
{
- srv_encrypt_tables = val;
- os_event_set(fil_crypt_threads_event);
+ srv_encrypt_tables = val;
+ os_event_set(fil_crypt_threads_event);
}
/*********************************************************************
@@ -2452,7 +2475,6 @@ Init threads for key rotation */
UNIV_INTERN
void
fil_crypt_threads_init()
-/*====================*/
{
ut_ad(mutex_own(&fil_system->mutex));
if (!fil_crypt_threads_inited) {
@@ -2473,75 +2495,40 @@ Clean up key rotation threads resources */
UNIV_INTERN
void
fil_crypt_threads_cleanup()
-/*=======================*/
{
if (!fil_crypt_threads_inited) {
return;
}
ut_a(!srv_n_fil_crypt_threads_started);
os_event_free(fil_crypt_event);
+ fil_crypt_event = NULL;
os_event_free(fil_crypt_threads_event);
+ fil_crypt_threads_event = NULL;
mutex_free(&fil_crypt_threads_mutex);
fil_crypt_threads_inited = false;
}
/*********************************************************************
-Mark a space as closing */
-UNIV_INTERN
-void
-fil_space_crypt_mark_space_closing(
-/*===============================*/
- ulint space, /*!< in: tablespace id */
- fil_space_crypt_t* crypt_data) /*!< in: crypt_data or NULL */
-{
- if (!fil_crypt_threads_inited) {
- return;
- }
-
- mutex_enter(&fil_crypt_threads_mutex);
-
- if (!crypt_data) {
- crypt_data = fil_space_get_crypt_data(space);
- }
-
- if (crypt_data == NULL) {
- mutex_exit(&fil_crypt_threads_mutex);
- return;
- }
-
- mutex_enter(&crypt_data->mutex);
- mutex_exit(&fil_crypt_threads_mutex);
- crypt_data->closing = true;
- mutex_exit(&crypt_data->mutex);
-}
-
-/*********************************************************************
-Wait for crypt threads to stop accessing space */
+Wait for crypt threads to stop accessing space
+@param[in] space Tablespace */
UNIV_INTERN
void
fil_space_crypt_close_tablespace(
-/*=============================*/
- ulint space) /*!< in: Space id */
+ const fil_space_t* space)
{
- if (!srv_encrypt_tables) {
+ if (!srv_encrypt_tables || !space->crypt_data) {
return;
}
mutex_enter(&fil_crypt_threads_mutex);
- fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(space);
-
- if (crypt_data == NULL || crypt_data->is_closing(false)) {
- mutex_exit(&fil_crypt_threads_mutex);
- return;
- }
+ fil_space_crypt_t* crypt_data = space->crypt_data;
- uint start = time(0);
- uint last = start;
+ time_t start = time(0);
+ time_t last = start;
mutex_enter(&crypt_data->mutex);
mutex_exit(&fil_crypt_threads_mutex);
- crypt_data->closing = true;
uint cnt = crypt_data->rotate_state.active_threads;
bool flushing = crypt_data->rotate_state.flushing;
@@ -2551,20 +2538,22 @@ fil_space_crypt_close_tablespace(
/* release dict mutex so that scrub threads can release their
* table references */
dict_mutex_exit_for_mysql();
+
/* wakeup throttle (all) sleepers */
os_event_set(fil_crypt_throttle_sleep_event);
+
os_thread_sleep(20000);
dict_mutex_enter_for_mysql();
mutex_enter(&crypt_data->mutex);
cnt = crypt_data->rotate_state.active_threads;
flushing = crypt_data->rotate_state.flushing;
- uint now = time(0);
+ time_t now = time(0);
if (now >= last + 30) {
ib_logf(IB_LOG_LEVEL_WARN,
- "Waited %u seconds to drop space: %lu.",
- now - start, space);
+ "Waited %ld seconds to drop space: %s(" ULINTPF ").",
+ now - start, space->name, space->id);
last = now;
}
}
@@ -2574,22 +2563,23 @@ fil_space_crypt_close_tablespace(
/*********************************************************************
Get crypt status for a space (used by information_schema)
-return 0 if crypt data present */
+@param[in] space Tablespace
+@param[out] status Crypt status */
UNIV_INTERN
-int
+void
fil_space_crypt_get_status(
-/*=======================*/
- ulint id, /*!< in: space id */
- struct fil_space_crypt_status_t* status) /*!< out: status */
+ const fil_space_t* space,
+ struct fil_space_crypt_status_t* status)
{
- fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(id);
-
memset(status, 0, sizeof(*status));
+ ut_ad(space->n_pending_ops > 0);
+ fil_space_crypt_t* crypt_data = space->crypt_data;
+ status->space = space->id;
+
if (crypt_data != NULL) {
- status->space = id;
- status->scheme = crypt_data->type;
mutex_enter(&crypt_data->mutex);
+ status->scheme = crypt_data->type;
status->keyserver_requests = crypt_data->keyserver_requests;
status->min_key_version = crypt_data->min_key_version;
status->key_id = crypt_data->key_id;
@@ -2603,8 +2593,6 @@ fil_space_crypt_get_status(
crypt_data->rotate_state.next_offset;
status->rotate_max_page_number =
crypt_data->rotate_state.max_offset;
- } else {
- status->rotating = false;
}
mutex_exit(&crypt_data->mutex);
@@ -2612,25 +2600,17 @@ fil_space_crypt_get_status(
if (srv_encrypt_tables || crypt_data->min_key_version) {
status->current_key_version =
fil_crypt_get_latest_key_version(crypt_data);
- } else {
- status->current_key_version = 0;
- }
- } else {
- if (srv_encrypt_tables) {
- os_event_set(fil_crypt_threads_event);
}
}
-
- return crypt_data == NULL ? 1 : 0;
}
/*********************************************************************
-Return crypt statistics */
+Return crypt statistics
+@param[out] stat Crypt statistics */
UNIV_INTERN
void
fil_crypt_total_stat(
-/*=================*/
- fil_crypt_stat_t *stat) /*!< out: Crypt statistics */
+ fil_crypt_stat_t *stat)
{
mutex_enter(&crypt_stat_mutex);
*stat = crypt_stat;
@@ -2639,21 +2619,24 @@ fil_crypt_total_stat(
/*********************************************************************
Get scrub status for a space (used by information_schema)
-return 0 if data found */
+
+@param[in] space Tablespace
+@param[out] status Scrub status */
UNIV_INTERN
-int
+void
fil_space_get_scrub_status(
-/*=======================*/
- ulint id, /*!< in: space id */
- struct fil_space_scrub_status_t* status) /*!< out: status */
+ const fil_space_t* space,
+ struct fil_space_scrub_status_t* status)
{
- fil_space_crypt_t* crypt_data = fil_space_get_crypt_data(id);
-
memset(status, 0, sizeof(*status));
+ ut_ad(space->n_pending_ops > 0);
+ fil_space_crypt_t* crypt_data = space->crypt_data;
+
+ status->space = space->id;
+
if (crypt_data != NULL) {
- status->space = id;
- status->compressed = fil_space_get_zip_size(id) > 0;
+ status->compressed = fsp_flags_get_zip_size(space->flags) > 0;
mutex_enter(&crypt_data->mutex);
status->last_scrub_completed =
crypt_data->rotate_state.scrubbing.last_scrub_completed;
@@ -2668,12 +2651,8 @@ fil_space_get_scrub_status(
crypt_data->rotate_state.next_offset;
status->current_scrub_max_page_number =
crypt_data->rotate_state.max_offset;
- } else {
- status->scrubbing = false;
}
mutex_exit(&crypt_data->mutex);
}
-
- return crypt_data == NULL ? 1 : 0;
}