summaryrefslogtreecommitdiff
path: root/storage/maria
diff options
context:
space:
mode:
authorunknown <guilhem@gbichot4.local>2008-04-24 17:22:51 +0200
committerunknown <guilhem@gbichot4.local>2008-04-24 17:22:51 +0200
commitc9a825810d90e86683bd055acfa22c05445f1347 (patch)
treec8ab98c197f7bd827aec9d59ce2b53c950b660dc /storage/maria
parent28131daa266e90e223c4b8847346d5dc55c5ffaf (diff)
downloadmariadb-git-c9a825810d90e86683bd055acfa22c05445f1347.tar.gz
WL#3072 - Maria Recovery
Recovery of R-tree and fulltext indices. Fix for BUG#35551 "Maria: crash in REPAIR TABLE/ENABLE KEYS if using repair-with-keycache method". Fix for bug (see ma_rt_index.c) where we could have a wrong page_link pointer causing wrong memory access during some R-tree index insert/delete. Making ma_rt_test work again (it had been neglected over time) and adding options (record type etc) to prepare it for integration into ma_test_all-t (but there is BUG#36321 about "ma_rt_test -M" crash) mysql-test/r/maria.result: correct result mysql-test/t/maria.test: now we get no error storage/maria/ma_blockrec.c: delete_dir_entry() and delete_head_or_tail() don't use info->keyread_buff. ma_get_length() does not change **packet, marking it with 'const' to remove some casts in callers of this function. The (const uchar**)&header casts will be removed when Monty changes 'header' to const uchar*. _ma_apply_redo_purge_row_head_or_tail() sets 'buff' from pagecache_read() so its initialization was superfluous. storage/maria/ma_check.c: Fix for BUG#35551 "Maria: crash in REPAIR TABLE/ENABLE KEYS if using repair-with-keycache method" (see comment in code) storage/maria/ma_create.c: FULLTEXT and SPATIAL indices have logging now, they are recoverable. storage/maria/ma_delete.c: Logging done by _ma_ck_delete() is moved to a function (_ma_write_undo_key_delete()), for reusal by R-tree logging. _ma_log_delete() is made non-static for same reason, and some of its parameters are made pointers to const. Removed wrong comment ("Note that for delete key" etc, contradicted by code and comment "Log also position to row" a few lines above) storage/maria/ma_ft_update.c: unneeded cast, comment for future storage/maria/ma_key_recover.c: Comment about possible deadlock. Write bad page to DBUG trace if KEY_OP_CHECK founds bad CRC. Support operation KEY_OP_MULTI_COPY. When we execute, in UNDO phase, UNDO_KEY_DELETE|INSERT, we must call the proper key insertion|deletion depending on if this is R-tree or B-tree. Explanation of of _ma_[un]lock_key_del() work, maybe useful for mortals like me. storage/maria/ma_key_recover.h: change of prototypes storage/maria/ma_loghandler.h: New operation which can be stored in REDO_INDEX log records: KEY_OP_MULTI_COPY storage/maria/ma_page.c: Comments storage/maria/ma_pagecache.c: typo storage/maria/ma_rt_index.c: Fix for bug: the page_link pointer in maria_rtree_insert_req() could be wrong when we set its 'changed' member; for the solution see ma_key_recover.h. It is needed only in cases when we manipulate several pages. Logging of changes done to pages by key insert/delete. maria_rtree_delete()'s main work is moved to a new function maria_rtree_real_delete(), which is used by maria_rtree_delete() and by applying of UNDO_KEY_INSERT. storage/maria/ma_rt_index.h: new prototypes and macros for ma_rt_index.c storage/maria/ma_rt_key.c: Logging of maria_rtree_add_key() and maria_rtree_delete_key(). When inserting, split is necessary if there is not enough room for key: take checksum's occupied space in this calculation. storage/maria/ma_rt_key.h: new prototypes (those functions need to know the page's id because they do logging) storage/maria/ma_rt_mbr.c: Comments about what the functions change. storage/maria/ma_rt_split.c: maria_rtree_split_page() needs to know the page's id, because it does logging. Logging of what a split operation does to the split page (see comment of _ma_log_rt_split(): moves of keys inside the page, sometimes insertion of the new key, and shrinking of the page) and to the new page (receives some keys from split page, and sometimes the new key). storage/maria/ma_rt_test.c: ma_rt_test had been forgotten when maria_rkey() was changed some months ago (0->HA_WHOLE_KEY change), and when calls to maria_rnd(,,HA_OFFSET_ERROR) were rewritten to maria_scan() calls (which implies maria_scan_init()). The 'max_i' change is to adapt to the fact that maria_scan() does not return deleted records for BLOCK_RECORD but does so for other formats; the initial code assumed a certain number of deleted records would be returned, we change it to rather count only non-deleted ones. We also add more features to this test, like ma_test1 (the plan is to run ma_rt_test in ma_test_all-t): options to choose records' format, table checksum, transactions, checkpoints, end at specific stages, abort without committing, and debug trace. storage/maria/ma_test1.c: MY_INIT() does my_init(). storage/maria/ma_write.c: Logging done by _ma_ck_write_btree_with_log() is moved to a function (_ma_write_undo_key_insert()), for reusal by R-tree logging. _ma_log_new() and _ma_log_change() are made non-static for same reason. Some parameters of logging functions are made pointers to const. If EXTRA_DEBUG_KEY_CHANGES, we now log CRC in _ma_log_change() too (better checks, bigger record). storage/maria/maria_read_log.c: Program takes no arguments, bail out if any, instead of silently discarding them storage/myisam/rt_test.c: rt_test had been forgotten when mi_rkey() was changed some months ago (0->HA_WHOLE_KEY change). The 'max_i' change is to make it symmetric with ma_rt_test.c mysql-test/r/maria-gis-rtree-dynamic.result: correct result mysql-test/r/maria-gis-rtree-trans.result: correct result mysql-test/r/maria-recovery-rtree-ft.result: almost correct result (hitting BUG# in the end) mysql-test/t/maria-gis-rtree-dynamic.test: test R-tree & dynamic row format mysql-test/t/maria-gis-rtree-trans.test: Test R-tree and page row format and transactional mysql-test/t/maria-recovery-rtree-ft-master.opt: usual options for recovery testing mysql-test/t/maria-recovery-rtree-ft.test: test of recovery of R-tree and fulltext indices.
Diffstat (limited to 'storage/maria')
-rw-r--r--storage/maria/ma_blockrec.c19
-rw-r--r--storage/maria/ma_check.c12
-rw-r--r--storage/maria/ma_create.c10
-rw-r--r--storage/maria/ma_delete.c122
-rw-r--r--storage/maria/ma_ft_update.c6
-rw-r--r--storage/maria/ma_key_recover.c98
-rw-r--r--storage/maria/ma_key_recover.h14
-rw-r--r--storage/maria/ma_loghandler.h3
-rw-r--r--storage/maria/ma_page.c7
-rw-r--r--storage/maria/ma_pagecache.c2
-rw-r--r--storage/maria/ma_rt_index.c183
-rw-r--r--storage/maria/ma_rt_index.h21
-rw-r--r--storage/maria/ma_rt_key.c43
-rw-r--r--storage/maria/ma_rt_key.h4
-rw-r--r--storage/maria/ma_rt_mbr.c3
-rw-r--r--storage/maria/ma_rt_split.c222
-rw-r--r--storage/maria/ma_rt_test.c221
-rw-r--r--storage/maria/ma_test1.c1
-rw-r--r--storage/maria/ma_write.c201
-rw-r--r--storage/maria/maria_read_log.c2
20 files changed, 904 insertions, 290 deletions
diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c
index 7ac1df772d5..8623835d108 100644
--- a/storage/maria/ma_blockrec.c
+++ b/storage/maria/ma_blockrec.c
@@ -3812,9 +3812,6 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
leave the page as write locked as we may put
the new row into the old position.
- NOTES
- Uses info->keyread_buff
-
RETURN
0 ok
1 error
@@ -3934,9 +3931,6 @@ static my_bool delete_head_or_tail(MARIA_HA *info,
info Handler
tails Pointer to vector of tail positions, ending with 0
- NOTES
- Uses info->keyread_buff
-
RETURN
0 ok
1 error
@@ -5211,9 +5205,9 @@ uint ma_calc_length_for_store_length(ulong nr)
/* Retrive a stored number */
-static ulong ma_get_length(uchar **packet)
+static ulong ma_get_length(const uchar **packet)
{
- reg1 uchar *pos= *packet;
+ reg1 const uchar *pos= *packet;
if (*pos < 251)
{
(*packet)++;
@@ -6031,7 +6025,7 @@ uint _ma_apply_redo_purge_row_head_or_tail(MARIA_HA *info, LSN lsn,
pgcache_page_no_t page;
uint rownr, empty_space;
uint block_size= share->block_size;
- uchar *buff= info->keyread_buff;
+ uchar *buff;
int result;
uint error;
MARIA_PINNED_PAGE page_link;
@@ -6558,7 +6552,7 @@ my_bool _ma_apply_undo_row_delete(MARIA_HA *info, LSN undo_lsn,
header+= 2 + row.field_lengths_length;
}
if (share->base.blobs)
- row.blob_length= ma_get_length(&header);
+ row.blob_length= ma_get_length((const uchar**)&header);
/* We need to build up a record (without blobs) in rec_buff */
if (!(record= my_malloc(share->base.reclength, MYF(MY_WME))))
@@ -6739,8 +6733,7 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
{
MARIA_SHARE *share= info->s;
MARIA_RECORD_POS record_pos;
- uchar *field_length_data;
- const uchar *field_length_data_end, *extent_info;
+ const uchar *field_length_data, *field_length_data_end, *extent_info;
uchar *current_record, *orig_record;
pgcache_page_no_t page;
ha_checksum checksum_delta;
@@ -6773,7 +6766,7 @@ my_bool _ma_apply_undo_row_update(MARIA_HA *info, LSN undo_lsn,
Set header to point to old field values, generated by
fill_update_undo_parts()
*/
- field_length_header= ma_get_length(&header);
+ field_length_header= ma_get_length((const uchar**)&header);
field_length_data= header;
header+= field_length_header;
field_length_data_end= header;
diff --git a/storage/maria/ma_check.c b/storage/maria/ma_check.c
index d1d66f5b538..4db12c3a16c 100644
--- a/storage/maria/ma_check.c
+++ b/storage/maria/ma_check.c
@@ -4442,7 +4442,16 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
for (;;)
{
int flag;
-
+ /*
+ Assume table is transactional and it had LSN pages in the
+ cache. Repair has flushed them, left data pages stay in
+ cache, and disabled transactionality (so share's current page
+ type is PLAIN); page cache would assert if it finds a cached LSN page
+ while _ma_scan_block_record() requested a PLAIN page. So we use
+ UNKNOWN.
+ */
+ enum pagecache_page_type save_page_type= share->page_type;
+ share->page_type= PAGECACHE_READ_UNKNOWN_PAGE;
if (info != sort_info->new_info)
{
/* Safe scanning */
@@ -4459,6 +4468,7 @@ static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
flag= _ma_scan_block_record(info, sort_param->record,
info->cur_row.nextpos, 1);
}
+ share->page_type= save_page_type;
if (!flag)
{
if (sort_param->calc_checksum)
diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c
index abfb4c0c514..1b86f26f4c0 100644
--- a/storage/maria/ma_create.c
+++ b/storage/maria/ma_create.c
@@ -393,16 +393,6 @@ int maria_create(const char *name, enum data_file_type datafile_type,
length= real_length_diff= 0;
min_key_length= key_length= pointer;
- if ((keydef->flag & (HA_SPATIAL | HA_FULLTEXT) &&
- ci->transactional))
- {
- my_errno= HA_ERR_UNSUPPORTED;
- my_message(HA_ERR_UNSUPPORTED,
- "Maria can't yet handle SPATIAL or FULLTEXT keys in "
- "transactional mode. For now use TRANSACTIONAL=0", MYF(0));
- goto err_no_lock;
- }
-
if (keydef->flag & HA_SPATIAL)
{
#ifdef HAVE_SPATIAL
diff --git a/storage/maria/ma_delete.c b/storage/maria/ma_delete.c
index d0757b8f274..097e94ee3df 100644
--- a/storage/maria/ma_delete.c
+++ b/storage/maria/ma_delete.c
@@ -33,9 +33,6 @@ static int underflow(MARIA_HA *info,MARIA_KEYDEF *keyinfo,
static uint remove_key(MARIA_KEYDEF *keyinfo,uint nod_flag,uchar *keypos,
uchar *lastkey,uchar *page_end,
my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
-static my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, uchar *buff,
- uchar *key_pos, uint changed_length,
- uint move_length);
/* @breif Remove a row from a MARIA table */
@@ -177,58 +174,8 @@ int _ma_ck_delete(register MARIA_HA *info, uint keynr, uchar *key,
&new_root);
if (!res && share->now_transactional)
- {
- uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
- KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
- LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
- struct st_msg_to_write_hook_for_undo_key msg;
- enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
-
- info->key_delete_undo_lsn[keynr]= info->trn->undo_lsn;
- lsn_store(log_data, info->trn->undo_lsn);
- key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
- log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
-
- if (new_root != share->state.key_root[keynr])
- {
- my_off_t page;
- page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
- new_root / share->block_size);
- page_store(log_pos, page);
- log_pos+= PAGE_STORE_SIZE;
- log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
- }
-
- /* Log also position to row */
- key_length+= share->rec_reflength;
-
- /*
- Note that for delete key, we don't log the reference to the record.
- This is because the row may be inserted at a different place when
- we exceute the undo
- */
- log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
- log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
- log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_buff;
- log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
-
- msg.root= &share->state.key_root[keynr];
- msg.value= new_root;
- /*
- set autoincrement to 1 if this is an auto_increment key
- This is only used if we are now in a rollback of a duplicate key
- */
- msg.auto_increment= share->base.auto_key == keynr + 1;
-
- if (translog_write_record(&lsn, log_type,
- info->trn, info,
- (translog_size_t)
- log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
- key_length,
- TRANSLOG_INTERNAL_PARTS + 2, log_array,
- log_data + LSN_STORE_SIZE, &msg))
- res= -1;
- }
+ res= _ma_write_undo_key_delete(info, keynr, key_buff, key_length,
+ new_root, &lsn);
else
{
share->state.key_root[keynr]= new_root;
@@ -1371,9 +1318,9 @@ static uint remove_key(MARIA_KEYDEF *keyinfo, uint nod_flag,
*/
-static my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, uchar *buff,
- uchar *key_pos, uint changed_length,
- uint move_length)
+my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, const uchar *buff,
+ const uchar *key_pos, uint changed_length,
+ uint move_length)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 9 + 7], *log_pos;
@@ -1436,3 +1383,62 @@ static my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, uchar *buff,
DBUG_RETURN(1);
DBUG_RETURN(0);
}
+
+
+/****************************************************************************
+ Logging of undos
+****************************************************************************/
+
+int _ma_write_undo_key_delete(MARIA_HA *info, uint keynr,
+ const uchar *key, uint key_length,
+ my_off_t new_root, LSN *res_lsn)
+{
+ MARIA_SHARE *share= info->s;
+ uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
+ KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
+ LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
+ struct st_msg_to_write_hook_for_undo_key msg;
+ enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
+
+ info->key_delete_undo_lsn[keynr]= info->trn->undo_lsn;
+ lsn_store(log_data, info->trn->undo_lsn);
+ key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
+ log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
+
+ /**
+ @todo BUG if we had concurrent insert/deletes, reading state's key_root
+ like this would be unsafe.
+ */
+ if (new_root != share->state.key_root[keynr])
+ {
+ my_off_t page;
+ page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
+ new_root / share->block_size);
+ page_store(log_pos, page);
+ log_pos+= PAGE_STORE_SIZE;
+ log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
+ }
+
+ /* Log also position to row */
+ key_length+= share->rec_reflength;
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
+
+ msg.root= &share->state.key_root[keynr];
+ msg.value= new_root;
+ /*
+ set autoincrement to 1 if this is an auto_increment key
+ This is only used if we are now in a rollback of a duplicate key
+ */
+ msg.auto_increment= share->base.auto_key == keynr + 1;
+
+ return translog_write_record(res_lsn, log_type,
+ info->trn, info,
+ (translog_size_t)
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
+ key_length,
+ TRANSLOG_INTERNAL_PARTS + 2, log_array,
+ log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
+}
diff --git a/storage/maria/ma_ft_update.c b/storage/maria/ma_ft_update.c
index 264cb90e4f4..71085abcf78 100644
--- a/storage/maria/ma_ft_update.c
+++ b/storage/maria/ma_ft_update.c
@@ -109,7 +109,7 @@ uint _ma_ft_parse(TREE *parsed, MARIA_HA *info, uint keynr, const uchar *record,
{
/** @todo this casts ftsi.pos (const) to non-const */
if (ftsi.pos)
- if (maria_ft_parse(parsed, (uchar *)ftsi.pos, ftsi.len, parser, param,
+ if (maria_ft_parse(parsed, ftsi.pos, ftsi.len, parser, param,
mem_root))
DBUG_RETURN(1);
}
@@ -335,6 +335,10 @@ uint _ma_ft_convert_to_ft2(MARIA_HA *info, uint keynr, uchar *key)
_ma_store_page_used(share, info->buff, length + share->keypage_header);
memcpy(info->buff + share->keypage_header, key_ptr, length);
info->keyread_buff_used= info->page_changed=1; /* info->buff is used */
+ /**
+ @todo RECOVERY BUG this is not logged yet. Ok as this code is never
+ called, but soon it will be.
+ */
if ((root= _ma_new(info, DFLT_INIT_HITS, &page_link)) == HA_OFFSET_ERROR ||
_ma_write_keypage(info, keyinfo, root, page_link->write_lock,
DFLT_INIT_HITS, info->buff))
diff --git a/storage/maria/ma_key_recover.c b/storage/maria/ma_key_recover.c
index 1cd0fc4150c..2876956e84b 100644
--- a/storage/maria/ma_key_recover.c
+++ b/storage/maria/ma_key_recover.c
@@ -19,6 +19,7 @@
#include "ma_blockrec.h"
#include "trnman.h"
#include "ma_key_recover.h"
+#include "ma_rt_index.h"
/****************************************************************************
Some helper functions used both by key page loggin and block page loggin
@@ -202,6 +203,15 @@ my_bool write_hook_for_undo_key(enum translog_record_type type,
(struct st_msg_to_write_hook_for_undo_key *) hook_arg;
*msg->root= msg->value;
+ /**
+ @todo BUG
+ so we have log mutex and then intern_lock.
+ While in checkpoint we have intern_lock and then log mutex, like when we
+ flush bitmap (flushing bitmap pages can call hook which takes log mutex).
+ So we can deadlock.
+ Another one is that in translog_assign_id_to_share() we have intern_lock
+ and then log mutex.
+ */
_ma_fast_unlock_key_del(tbl_info);
return write_hook_for_undo(type, trn, tbl_info, lsn, 0);
}
@@ -931,12 +941,49 @@ uint _ma_apply_redo_index(MARIA_HA *info,
crc= uint4korr(header+2);
_ma_store_page_used(share, buff, page_length);
DBUG_ASSERT(check_page_length == page_length);
- DBUG_ASSERT(crc == (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
- page_length- LSN_STORE_SIZE));
+ if (crc != (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
+ page_length - LSN_STORE_SIZE))
+ {
+ DBUG_PRINT("info",("page_length %u",page_length));
+ DBUG_DUMP("KEY_OP_CHECK bad page", buff, share->block_size);
+ DBUG_ASSERT("crc" == "failure in REDO_INDEX");
+ }
#endif
header+= 6;
break;
}
+ case KEY_OP_MULTI_COPY: /* 9 */
+ {
+ /*
+ List of fixed-len memcpy() operations with their source located inside
+ the page. The log record's piece looks like:
+ first the length 'full_length' to be used by memcpy()
+ then the number of bytes used by the list of (to,from) pairs
+ then the (to,from) pairs, so we do:
+ for (t,f) in [list of (to,from) pairs]:
+ memcpy(t, f, full_length).
+ */
+ uint full_length, log_memcpy_length;
+ const uchar *log_memcpy_end;
+ full_length= uint2korr(header);
+ header+= 2;
+ log_memcpy_length= uint2korr(header);
+ header+= 2;
+ log_memcpy_end= header + log_memcpy_length;
+ DBUG_ASSERT(full_length < share->block_size);
+ while (header < log_memcpy_end)
+ {
+ uint to, from;
+ to= uint2korr(header);
+ header+= 2;
+ from= uint2korr(header);
+ header+= 2;
+ /* "from" is a place in the existing page */
+ DBUG_ASSERT(max(from, to) < share->block_size);
+ memcpy(buff + to, buff + from, full_length);
+ }
+ break;
+ }
case KEY_OP_NONE:
default:
DBUG_ASSERT(0);
@@ -1004,8 +1051,11 @@ my_bool _ma_apply_undo_key_insert(MARIA_HA *info, LSN undo_lsn,
DBUG_DUMP("key", key, length);
new_root= share->state.key_root[keynr];
- res= _ma_ck_real_delete(info, share->keyinfo+keynr, key,
- length - share->rec_reflength, &new_root);
+ res= (share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
+ maria_rtree_real_delete(info, keynr, key, length - share->rec_reflength,
+ &new_root) :
+ _ma_ck_real_delete(info, share->keyinfo+keynr, key,
+ length - share->rec_reflength, &new_root);
if (res)
_ma_mark_file_crashed(share);
msg.root= &share->state.key_root[keynr];
@@ -1055,10 +1105,12 @@ my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
DBUG_DUMP("key", key, length);
new_root= share->state.key_root[keynr];
- res= _ma_ck_real_write_btree(info, share->keyinfo+keynr, key,
- length - share->rec_reflength,
- &new_root,
- share->keyinfo[keynr].write_comp_flag);
+ res= (share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
+ maria_rtree_insert_level(info, keynr, key, length - share->rec_reflength,
+ -1, &new_root) :
+ _ma_ck_real_write_btree(info, share->keyinfo+keynr, key,
+ length - share->rec_reflength, &new_root,
+ share->keyinfo[keynr].write_comp_flag);
if (res)
_ma_mark_file_crashed(share);
@@ -1089,7 +1141,7 @@ my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
@param info Maria handler
@param insert_at_end Set to 1 if we are doing an insert
- @notes
+ @note
To allow higher concurrency in the common case where we do inserts
and we don't have any linked blocks we do the following:
- Mark in info->used_key_del that we are not using key_del
@@ -1106,6 +1158,32 @@ my_bool _ma_lock_key_del(MARIA_HA *info, my_bool insert_at_end)
{
MARIA_SHARE *share= info->s;
+ /*
+ info->used_key_del is 0 initially.
+ If the caller needs a block (_ma_new()), we look at the free list:
+ - looks empty? then caller will create a new block at end of file and
+ remember (through info->used_key_del==2) that it will not change
+ state.key_del and does not need to wake up waiters as nobody will wait for
+ it.
+ - non-empty? then we wait for other users of the state.key_del list to
+ have finished, then we lock this list (through share->used_key_del==1)
+ because we need to prevent some other thread to also read state.key_del
+ and use the same page as ours. We remember through info->used_key_del==1
+ that we will have to set state.key_del at unlock time and wake up
+ waiters.
+ If the caller wants to free a block (_ma_dispose()), "empty" and
+ "non-empty" are treated as "non-empty" is treated above.
+ When we are ready to unlock, we copy share->current_key_del into
+ state.key_del. Unlocking happens when writing the UNDO log record, that
+ can make a long lock time.
+ Why we wrote "*looks* empty": because we are looking at state.key_del
+ which may be slightly old (share->current_key_del may be more recent and
+ exact): when we want a new page, we tolerate to treat "there was no free
+ page 1 millisecond ago" as "there is no free page". It's ok to non-pop
+ (_ma_new(), page will be found later anyway) but it's not ok to non-push
+ (_ma_dispose(), page would be lost).
+ When we leave this function, info->used_key_del is always 1 or 2.
+ */
if (info->used_key_del != 1)
{
pthread_mutex_lock(&share->intern_lock);
@@ -1140,7 +1218,7 @@ void _ma_unlock_key_del(MARIA_HA *info)
MARIA_SHARE *share= info->s;
pthread_mutex_lock(&share->intern_lock);
share->used_key_del= 0;
- share->state.key_del= info->s->current_key_del;
+ share->state.key_del= share->current_key_del;
pthread_mutex_unlock(&share->intern_lock);
pthread_cond_signal(&share->intern_cond);
}
diff --git a/storage/maria/ma_key_recover.h b/storage/maria/ma_key_recover.h
index 196d0506609..6f261ab8360 100644
--- a/storage/maria/ma_key_recover.h
+++ b/storage/maria/ma_key_recover.h
@@ -46,6 +46,13 @@ my_bool _ma_write_clr(MARIA_HA *info, LSN undo_lsn,
enum translog_record_type undo_type,
my_bool store_checksum, ha_checksum checksum,
LSN *res_lsn, void *extra_msg);
+int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
+ const uchar *key, uint key_length,
+ my_off_t *root, my_off_t new_root,
+ LSN *res_lsn);
+int _ma_write_undo_key_delete(MARIA_HA *info, uint keynr,
+ const uchar *key, uint key_length,
+ my_off_t new_root, LSN *res_lsn);
my_bool write_hook_for_clr_end(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
@@ -70,6 +77,13 @@ my_bool _ma_log_add(MARIA_HA *info, my_off_t page, uchar *buff,
uint buff_length, uchar *key_pos,
uint changed_length, int move_length,
my_bool handle_overflow);
+my_bool _ma_log_delete(MARIA_HA *info, my_off_t page, const uchar *buff,
+ const uchar *key_pos, uint changed_length,
+ uint move_length);
+my_bool _ma_log_change(MARIA_HA *info, my_off_t page, const uchar *buff,
+ const uchar *key_pos, uint length);
+my_bool _ma_log_new(MARIA_HA *info, my_off_t page, const uchar *buff,
+ uint page_length, uint key_nr, my_bool root_page);
uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
const uchar *header, uint length);
diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h
index afd72cb770c..c21d9492cba 100644
--- a/storage/maria/ma_loghandler.h
+++ b/storage/maria/ma_loghandler.h
@@ -160,7 +160,8 @@ enum en_key_op
KEY_OP_DEL_PREFIX, /* Delete data at start of page */
KEY_OP_ADD_SUFFIX, /* Insert data at end of page */
KEY_OP_DEL_SUFFIX, /* Delete data at end of page */
- KEY_OP_CHECK /* For debugging; CRC of used part of page */
+ KEY_OP_CHECK, /* For debugging; CRC of used part of page */
+ KEY_OP_MULTI_COPY /* List of memcpy()s with fixed-len sources in page */
};
/* Size of log file; One log file is restricted to 4G */
diff --git a/storage/maria/ma_page.c b/storage/maria/ma_page.c
index 75e93a2229e..9b2fd6b126c 100644
--- a/storage/maria/ma_page.c
+++ b/storage/maria/ma_page.c
@@ -172,7 +172,8 @@ int _ma_write_keypage(register MARIA_HA *info,
@param page_not_read 1 if page has not yet been read
@note
- The page at 'pos' must have been read with a write lock
+ The page at 'pos' must have been read with a write lock.
+ This function does logging (unlike _ma_new()).
@return
@retval 0 ok
@@ -283,6 +284,10 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
check if this is used by checking if
page_link->changed != 0
+ @note Logging of this is left to the caller (so that the "new"ing and the
+ first changes done to this new page can be logged as one single entry - one
+ single _ma_log_new()) call).
+
@return
HA_OFFSET_ERROR File is full or page read error
# Page address to use
diff --git a/storage/maria/ma_pagecache.c b/storage/maria/ma_pagecache.c
index cf1e954c109..3e28835d4a8 100644
--- a/storage/maria/ma_pagecache.c
+++ b/storage/maria/ma_pagecache.c
@@ -4455,7 +4455,7 @@ int flush_pagecache_blocks_with_filter(PAGECACHE *pagecache,
void *filter_arg)
{
int res;
- DBUG_ENTER("flush_pagecache_blocks");
+ DBUG_ENTER("flush_pagecache_blocks_with_filter");
DBUG_PRINT("enter", ("pagecache: 0x%lx", (long) pagecache));
if (pagecache->disk_blocks <= 0)
diff --git a/storage/maria/ma_rt_index.c b/storage/maria/ma_rt_index.c
index 4f80d6a5a76..a0ab8c722ad 100644
--- a/storage/maria/ma_rt_index.c
+++ b/storage/maria/ma_rt_index.c
@@ -455,8 +455,9 @@ int maria_rtree_get_next(MARIA_HA *info, uint keynr, uint key_length)
/*
Choose non-leaf better key for insertion
-*/
+ Returns a pointer inside the page_buf buffer.
+*/
#ifdef PICK_BY_PERIMETER
static const uchar *maria_rtree_pick_key(const MARIA_HA *info,
const MARIA_KEYDEF *keyinfo,
@@ -548,10 +549,11 @@ static int maria_rtree_insert_req(MARIA_HA *info,
my_off_t *new_page,
int ins_level, int level)
{
- uint nod_flag;
+ uint nod_flag, page_link_idx;
int res;
uchar *page_buf, *k;
MARIA_PINNED_PAGE *page_link;
+ MARIA_SHARE *share= info->s;
DBUG_ENTER("maria_rtree_insert_req");
if (!(page_buf= (uchar*) my_alloca((uint)keyinfo->block_length +
@@ -563,7 +565,8 @@ static int maria_rtree_insert_req(MARIA_HA *info,
if (!_ma_fetch_keypage(info, keyinfo, page, PAGECACHE_LOCK_WRITE,
DFLT_INIT_HITS, page_buf, 0, &page_link))
goto err1;
- nod_flag= _ma_test_if_nod(info->s, page_buf);
+ page_link_idx= page_link_to_idx(info);
+ nod_flag= _ma_test_if_nod(share, page_buf);
DBUG_PRINT("rtree", ("page: %lu level: %d ins_level: %d nod_flag: %u",
(ulong) page, level, ins_level, nod_flag));
@@ -578,10 +581,13 @@ static int maria_rtree_insert_req(MARIA_HA *info,
_ma_kpos(nod_flag, k), new_page,
ins_level, level + 1)))
{
- case 0: /* child was not split */
+ case 0: /* child was not split, most common case */
{
maria_rtree_combine_rect(keyinfo->seg, k, key, k, key_length);
- page_link->changed= 1;
+ if (share->now_transactional &&
+ _ma_log_change(info, page, page_buf, k, key_length))
+ goto err1;
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_write_keypage(info, keyinfo, page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, page_buf))
@@ -595,14 +601,17 @@ static int maria_rtree_insert_req(MARIA_HA *info,
if (maria_rtree_set_key_mbr(info, keyinfo, k, key_length,
_ma_kpos(nod_flag, k)))
goto err1;
+ if (share->now_transactional &&
+ _ma_log_change(info, page, page_buf, k, key_length))
+ goto err1;
/* add new key for new page */
_ma_kpointer(info, new_key - nod_flag, *new_page);
if (maria_rtree_set_key_mbr(info, keyinfo, new_key, key_length,
*new_page))
goto err1;
res= maria_rtree_add_key(info, keyinfo, new_key, key_length,
- page_buf, new_page);
- page_link->changed= 1;
+ page_buf, page, new_page);
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_write_keypage(info, keyinfo, page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, page_buf))
@@ -619,8 +628,8 @@ static int maria_rtree_insert_req(MARIA_HA *info,
else
{
res= maria_rtree_add_key(info, keyinfo, key, key_length, page_buf,
- new_page);
- page_link->changed= 1;
+ page, new_page);
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_write_keypage(info, keyinfo, page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, page_buf))
goto err1;
@@ -636,18 +645,24 @@ err1:
}
-/*
+/**
Insert key into the tree
- RETURN
- -1 Error
- 0 Root was not split
- 1 Root was split
+ @param info table
+ @param keynr key's number
+ @param key key to insert
+ @param key_length key's length
+ @param ins_level at which level key insertion should start
+ @param root put new key_root there
+
+ @return Operation result
+ @retval -1 Error
+ @retval 0 Root was not split
+ @retval 1 Root was split
*/
-int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
- const uchar *key,
- uint key_length, int ins_level)
+int maria_rtree_insert_level(MARIA_HA *info, uint keynr, const uchar *key,
+ uint key_length, int ins_level, my_off_t *root)
{
my_off_t old_root;
MARIA_SHARE *share= info->s;
@@ -655,6 +670,7 @@ int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
int res;
my_off_t new_page;
MARIA_PINNED_PAGE *page_link;
+ enum pagecache_page_lock write_lock;
DBUG_ENTER("maria_rtree_insert_level");
if ((old_root= share->state.key_root[keynr]) == HA_OFFSET_ERROR)
@@ -664,18 +680,23 @@ int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
if ((old_root= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
HA_OFFSET_ERROR)
DBUG_RETURN(-1);
+ write_lock= page_link->write_lock;
info->keyread_buff_used= 1;
bzero(info->buff, share->block_size);
_ma_store_keynr(share, info->buff, keynr);
_ma_store_page_used(share, info->buff, share->keypage_header);
+ if (share->now_transactional &&
+ _ma_log_new(info, old_root, info->buff, share->keypage_header,
+ keyinfo->key_nr, 1))
+ DBUG_RETURN(1);
+
res= maria_rtree_add_key(info, keyinfo, key, key_length, info->buff,
- NULL);
- if (_ma_write_keypage(info, keyinfo, old_root,
- page_link->write_lock,
+ old_root, NULL);
+ if (_ma_write_keypage(info, keyinfo, old_root, write_lock,
DFLT_INIT_HITS, info->buff))
DBUG_RETURN(1);
- share->state.key_root[keynr]= old_root;
+ *root= old_root;
DBUG_RETURN(res);
}
@@ -686,7 +707,7 @@ int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
{
break;
}
- case 1: /* root was split, grow a new root */
+ case 1: /* root was split, grow a new root; very rare */
{
uchar *new_root_buf, *new_key;
my_off_t new_root;
@@ -710,6 +731,12 @@ int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
if ((new_root= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
HA_OFFSET_ERROR)
goto err1;
+ write_lock= page_link->write_lock;
+
+ if (share->now_transactional &&
+ _ma_log_new(info, new_root, new_root_buf, share->keypage_header,
+ keyinfo->key_nr, 1))
+ goto err1;
new_key= new_root_buf + keyinfo->block_length + nod_flag;
@@ -718,7 +745,7 @@ int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
old_root))
goto err1;
if (maria_rtree_add_key(info, keyinfo, new_key, key_length, new_root_buf,
- NULL)
+ new_root, NULL)
== -1)
goto err1;
_ma_kpointer(info, new_key - nod_flag, new_page);
@@ -726,13 +753,13 @@ int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
new_page))
goto err1;
if (maria_rtree_add_key(info, keyinfo, new_key, key_length, new_root_buf,
- NULL)
+ new_root, NULL)
== -1)
goto err1;
- if (_ma_write_keypage(info, keyinfo, new_root, page_link->write_lock,
+ if (_ma_write_keypage(info, keyinfo, new_root, write_lock,
DFLT_INIT_HITS, new_root_buf))
goto err1;
- share->state.key_root[keynr]= new_root;
+ *root= new_root;
DBUG_PRINT("rtree", ("new root page: %lu level: %d nod_flag: %u",
(ulong) new_root, 0,
_ma_test_if_nod(share, new_root_buf)));
@@ -746,6 +773,7 @@ err1:
default:
case -1: /* error */
{
+ DBUG_ASSERT(0);
break;
}
}
@@ -765,12 +793,29 @@ int maria_rtree_insert(MARIA_HA *info, uint keynr,
uchar *key, uint key_length)
{
int res;
+ MARIA_SHARE *share= info->s;
+ my_off_t *root= &share->state.key_root[keynr];
+ my_off_t new_root= *root;
+ LSN lsn= LSN_IMPOSSIBLE;
DBUG_ENTER("maria_rtree_insert");
- res= - (!key_length ||
- (maria_rtree_insert_level(info, keynr, key, key_length, -1) == -1));
- _ma_fast_unlock_key_del(info);
- /** @todo RECOVERY use a real LSN */
- _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
+ if (key_length == 0)
+ {
+ res= -1;
+ goto err;
+ }
+ if ((res= -(maria_rtree_insert_level(info, keynr, key, key_length, -1,
+ &new_root) == -1)) != 0)
+ goto err;
+ if (share->now_transactional)
+ res= _ma_write_undo_key_insert(info, share->keyinfo + keynr,
+ key, key_length, root, new_root, &lsn);
+ else
+ {
+ *root= new_root;
+ _ma_fast_unlock_key_del(info);
+ }
+ _ma_unpin_all_pages_and_finalize_row(info, lsn);
+err:
DBUG_RETURN(res);
}
@@ -823,7 +868,7 @@ static int maria_rtree_delete_req(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
stPageList *ReinsertList, int level)
{
ulong i;
- uint nod_flag;
+ uint nod_flag, page_link_idx;
int res;
uchar *page_buf, *last, *k;
MARIA_PINNED_PAGE *page_link;
@@ -838,6 +883,7 @@ static int maria_rtree_delete_req(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
if (!_ma_fetch_keypage(info, keyinfo, page, PAGECACHE_LOCK_WRITE,
DFLT_INIT_HITS, page_buf, 0, &page_link))
goto err1;
+ page_link_idx= page_link_to_idx(info);
nod_flag= _ma_test_if_nod(share, page_buf);
DBUG_PRINT("rtree", ("page: %lu level: %d nod_flag: %u",
(ulong) page, level, nod_flag));
@@ -868,7 +914,10 @@ static int maria_rtree_delete_req(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
if (maria_rtree_set_key_mbr(info, keyinfo, k, key_length,
_ma_kpos(nod_flag, k)))
goto err1;
- page_link->changed= 1;
+ if (share->now_transactional &&
+ _ma_log_change(info, page, page_buf, k, key_length))
+ goto err1;
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_write_keypage(info, keyinfo, page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, page_buf))
@@ -894,8 +943,10 @@ static int maria_rtree_delete_req(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
subtree. So we need to re-insert its keys on the same
level later to reintegrate the subtrees.
*/
- maria_rtree_delete_key(info, page_buf, k, key_length, nod_flag);
- page_link->changed= 1;
+ if (maria_rtree_delete_key(info, page_buf, k, key_length,
+ nod_flag, page))
+ goto err1;
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_write_keypage(info, keyinfo, page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, page_buf))
@@ -911,8 +962,10 @@ static int maria_rtree_delete_req(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
}
case 2: /* vacuous case: last key in the leaf */
{
- maria_rtree_delete_key(info, page_buf, k, key_length, nod_flag);
- page_link->changed= 1;
+ if (maria_rtree_delete_key(info, page_buf, k, key_length,
+ nod_flag, page))
+ goto err1;
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_write_keypage(info, keyinfo, page,
PAGECACHE_LOCK_LEFT_WRITELOCKED,
DFLT_INIT_HITS, page_buf))
@@ -935,9 +988,11 @@ static int maria_rtree_delete_req(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
if (!maria_rtree_key_cmp(keyinfo->seg, key, k, key_length,
MBR_EQUAL | MBR_DATA))
{
- page_link->changed= 1;
+ page_link_from_idx(info, page_link_idx)->changed= 1;
- maria_rtree_delete_key(info, page_buf, k, key_length, nod_flag);
+ if (maria_rtree_delete_key(info, page_buf, k, key_length, nod_flag,
+ page))
+ goto err1;
*page_size= _ma_get_page_used(share, page_buf);
if (*page_size == info->s->keypage_header)
{
@@ -982,13 +1037,40 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
uchar *key, uint key_length)
{
MARIA_SHARE *share= info->s;
- uint page_size;
+ my_off_t new_root= share->state.key_root[keynr];
+ int res;
+ LSN lsn= LSN_IMPOSSIBLE;
+ DBUG_ENTER("maria_rtree_delete");
+
+ if ((res= maria_rtree_real_delete(info, keynr, key, key_length,
+ &new_root)))
+ goto err;
+
+ if (share->now_transactional)
+ res= _ma_write_undo_key_delete(info, keynr, key, key_length,
+ new_root, &lsn);
+ else
+ share->state.key_root[keynr]= new_root;
+
+err:
+ _ma_fast_unlock_key_del(info);
+ _ma_unpin_all_pages_and_finalize_row(info, lsn);
+ DBUG_RETURN(res);
+}
+
+
+int maria_rtree_real_delete(MARIA_HA *info, uint keynr,
+ const uchar *key, uint key_length,
+ my_off_t *root)
+{
+ MARIA_SHARE *share= info->s;
+ uint page_size, page_link_idx;
stPageList ReinsertList;
my_off_t old_root;
MARIA_KEYDEF *keyinfo= info->s->keyinfo + keynr;
MARIA_PINNED_PAGE *page_link, *root_page_link;
int res;
- DBUG_ENTER("maria_rtree_delete");
+ DBUG_ENTER("maria_rtree_real_delete");
if ((old_root= share->state.key_root[keynr]) == HA_OFFSET_ERROR)
{
@@ -1006,7 +1088,7 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
&page_size, &ReinsertList, 0)) {
case 2: /* empty */
{
- share->state.key_root[keynr]= HA_OFFSET_ERROR;
+ *root= HA_OFFSET_ERROR;
res= 0;
goto err;
}
@@ -1027,6 +1109,7 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
PAGECACHE_LOCK_WRITE,
DFLT_INIT_HITS, page_buf, 0, &page_link))
goto err1;
+ page_link_idx= page_link_to_idx(info);
nod_flag= _ma_test_if_nod(share, page_buf);
DBUG_PRINT("rtree", ("reinserting keys from "
"page: %lu level: %d nod_flag: %u",
@@ -1039,7 +1122,8 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
{
if ((res=
maria_rtree_insert_level(info, keynr, k, key_length,
- ReinsertList.pages[i].level)) == -1)
+ ReinsertList.pages[i].level,
+ root)) == -1)
{
my_afree(page_buf);
goto err1;
@@ -1059,7 +1143,7 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
}
res= 0;
my_afree(page_buf);
- page_link->changed= 1;
+ page_link_from_idx(info, page_link_idx)->changed= 1;
if (_ma_dispose(info, ReinsertList.pages[i].offs, 0))
goto err1;
}
@@ -1067,7 +1151,7 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
my_free((uchar*) ReinsertList.pages, MYF(0));
/* check for redundant root (not leaf, 1 child) and eliminate */
- if ((old_root= share->state.key_root[keynr]) == HA_OFFSET_ERROR)
+ if ((old_root= *root) == HA_OFFSET_ERROR)
goto err1;
if (!_ma_fetch_keypage(info, keyinfo, old_root,
PAGECACHE_LOCK_WRITE,
@@ -1078,13 +1162,11 @@ int maria_rtree_delete(MARIA_HA *info, uint keynr,
if (nod_flag && (page_size == share->keypage_header + key_length +
nod_flag))
{
- my_off_t new_root= _ma_kpos(nod_flag,
- rt_PAGE_FIRST_KEY(share, info->buff,
- nod_flag));
+ *root= _ma_kpos(nod_flag,
+ rt_PAGE_FIRST_KEY(share, info->buff, nod_flag));
root_page_link->changed= 1;
if (_ma_dispose(info, old_root, 0))
goto err1;
- share->state.key_root[keynr]= new_root;
}
info->update= HA_STATE_DELETED;
res= 0;
@@ -1106,9 +1188,6 @@ err1:
goto err; /* purecov: inspected */
}
err:
- _ma_fast_unlock_key_del(info);
- /** @todo RECOVERY use a real LSN */
- _ma_unpin_all_pages_and_finalize_row(info, LSN_IMPOSSIBLE);
DBUG_RETURN(res);
}
diff --git a/storage/maria/ma_rt_index.h b/storage/maria/ma_rt_index.h
index 29731bf7272..16009135373 100644
--- a/storage/maria/ma_rt_index.h
+++ b/storage/maria/ma_rt_index.h
@@ -30,6 +30,13 @@ int maria_rtree_insert(MARIA_HA *info, uint keynr, uchar *key,
uint key_length);
int maria_rtree_delete(MARIA_HA *info, uint keynr, uchar *key,
uint key_length);
+int maria_rtree_insert_level(MARIA_HA *info, uint keynr,
+ const uchar *key,
+ uint key_length, int ins_level,
+ my_off_t *root);
+int maria_rtree_real_delete(MARIA_HA *info, uint keynr,
+ const uchar *key, uint key_length,
+ my_off_t *root);
int maria_rtree_find_first(MARIA_HA *info, uint keynr, uchar *key,
uint key_length, uint search_flag);
@@ -42,9 +49,21 @@ ha_rows maria_rtree_estimate(MARIA_HA *info, uint keynr, uchar *key,
uint key_length, uint flag);
int maria_rtree_split_page(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
- uchar *page,
+ my_off_t page_offs, uchar *page,
const uchar *key, uint key_length,
my_off_t *new_page_offs);
+/**
+ When you obtain a MARIA_PINNED_PAGE* link (by calling
+ _ma_fetch_keypage()/_ma_new()/etc), it is valid only until the next call to
+ those functions on this MARIA_HA*, because that next call may cause a
+ realloc of the pinned_pages DYNAMIC_ARRAY, causing the first link to become
+ wrong. The _index_ in the array is however invariant, so in these situations
+ you should save the index immediately and use it to later obtain an
+ up-to-date link.
+*/
+#define page_link_to_idx(INFO) ((INFO)->pinned_pages.elements - 1)
+#define page_link_from_idx(INFO, IDX) \
+ dynamic_element(&(INFO)->pinned_pages, (IDX), MARIA_PINNED_PAGE *)
#endif /*HAVE_RTREE_KEYS*/
#endif /* _rt_index_h */
diff --git a/storage/maria/ma_rt_key.c b/storage/maria/ma_rt_key.c
index 96a1b6725fd..dc37e4d166a 100644
--- a/storage/maria/ma_rt_key.c
+++ b/storage/maria/ma_rt_key.c
@@ -14,6 +14,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
+#include "trnman.h"
+#include "ma_key_recover.h"
#ifdef HAVE_RTREE_KEYS
#include "ma_rt_index.h"
@@ -31,25 +33,25 @@
int maria_rtree_add_key(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
const uchar *key,
- uint key_length, uchar *page_buf,
+ uint key_length, uchar *page_buf, my_off_t page,
my_off_t *new_page)
{
MARIA_SHARE *share= info->s;
- uint page_size= _ma_get_page_used(share, page_buf);
+ uint page_size= _ma_get_page_used(share, page_buf), added_len;
uint nod_flag= _ma_test_if_nod(share, page_buf);
+ uchar *key_pos= rt_PAGE_END(share, page_buf);
DBUG_ENTER("maria_rtree_add_key");
if (page_size + key_length + share->base.rec_reflength <=
- keyinfo->block_length)
+ (uint16)(keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
{
/* split won't be necessary */
if (nod_flag)
{
/* save key */
DBUG_ASSERT(_ma_kpos(nod_flag, key) < info->state->key_file_length);
- memcpy(rt_PAGE_END(share, page_buf), key - nod_flag,
- key_length + nod_flag);
- page_size+= key_length + nod_flag;
+ memcpy(key_pos, key - nod_flag, key_length + nod_flag);
+ added_len= key_length + nod_flag;
}
else
{
@@ -58,16 +60,19 @@ int maria_rtree_add_key(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
share->base.rec_reflength) <
info->state->data_file_length +
share->base.pack_reclength);
- memcpy(rt_PAGE_END(share, page_buf), key, key_length +
- share->base.rec_reflength);
- page_size+= key_length + share->base.rec_reflength;
+ memcpy(key_pos, key, key_length + share->base.rec_reflength);
+ added_len= key_length + share->base.rec_reflength;
}
+ page_size+= added_len;
_ma_store_page_used(share, page_buf, page_size);
+ if (share->now_transactional &&
+ _ma_log_add(info, page, page_buf, key_pos - page_buf,
+ key_pos, added_len, added_len, 0))
+ DBUG_RETURN(-1);
DBUG_RETURN(0);
}
-
- DBUG_RETURN(maria_rtree_split_page(info, keyinfo, page_buf, key, key_length,
- new_page) ? -1 : 1);
+ DBUG_RETURN(maria_rtree_split_page(info, keyinfo, page, page_buf, key,
+ key_length, new_page) ? -1 : 1);
}
@@ -76,10 +81,11 @@ int maria_rtree_add_key(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
*/
int maria_rtree_delete_key(MARIA_HA *info, uchar *page_buf, uchar *key,
- uint key_length, uint nod_flag)
+ uint key_length, uint nod_flag, my_off_t page)
{
MARIA_SHARE *share= info->s;
uint16 page_size= _ma_get_page_used(share, page_buf);
+ uint key_length_with_nod_flag;
uchar *key_start;
key_start= key - nod_flag;
@@ -88,15 +94,20 @@ int maria_rtree_delete_key(MARIA_HA *info, uchar *page_buf, uchar *key,
memmove(key_start, key + key_length, page_size - key_length -
(key - page_buf));
- page_size-= key_length + nod_flag;
-
+ key_length_with_nod_flag= key_length + nod_flag;
+ page_size-= key_length_with_nod_flag;
_ma_store_page_used(share, page_buf, page_size);
+ if (share->now_transactional &&
+ _ma_log_delete(info, page, page_buf, key_start, 0,
+ key_length_with_nod_flag))
+
+ return -1;
return 0;
}
/*
- Calculate and store key MBR
+ Calculate and store key MBR into *key.
*/
int maria_rtree_set_key_mbr(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
diff --git a/storage/maria/ma_rt_key.h b/storage/maria/ma_rt_key.h
index 3bb3fff0bff..66d1e0e699b 100644
--- a/storage/maria/ma_rt_key.h
+++ b/storage/maria/ma_rt_key.h
@@ -23,10 +23,10 @@
int maria_rtree_add_key(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
const uchar *key,
- uint key_length, uchar *page_buf,
+ uint key_length, uchar *page_buf, my_off_t page,
my_off_t *new_page);
int maria_rtree_delete_key(MARIA_HA *info, uchar *page_buf, uchar *key,
- uint key_length, uint nod_flag);
+ uint key_length, uint nod_flag, my_off_t page);
int maria_rtree_set_key_mbr(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
uchar *key,
uint key_length, my_off_t child_page);
diff --git a/storage/maria/ma_rt_mbr.c b/storage/maria/ma_rt_mbr.c
index c486e1c8601..e269ed9f7af 100644
--- a/storage/maria/ma_rt_mbr.c
+++ b/storage/maria/ma_rt_mbr.c
@@ -261,6 +261,7 @@ double maria_rtree_rect_volume(HA_KEYSEG *keyseg, uchar *a, uint key_length)
/*
Creates an MBR as an array of doubles.
+ Fills *res.
*/
int maria_rtree_d_mbr(const HA_KEYSEG *keyseg, const uchar *a,
@@ -528,6 +529,7 @@ double maria_rtree_overlapping_area(HA_KEYSEG *keyseg, uchar* a, uchar* b,
/*
Calculates MBR_AREA(a+b) - MBR_AREA(a)
+ Fills *ab_area.
Note: when 'a' and 'b' objects are far from each other,
the area increase can be really big, so this function
can return 'inf' as a result.
@@ -739,6 +741,7 @@ double maria_rtree_perimeter_increase(HA_KEYSEG *keyseg, uchar* a, uchar* b,
/*
Calculates key page total MBR= MBR(key1) + MBR(key2) + ...
+ Stores into *c.
*/
int maria_rtree_page_mbr(const MARIA_HA *info, const HA_KEYSEG *keyseg,
const uchar *page_buf,
diff --git a/storage/maria/ma_rt_split.c b/storage/maria/ma_rt_split.c
index f00dea6b677..0e3a6284c13 100644
--- a/storage/maria/ma_rt_split.c
+++ b/storage/maria/ma_rt_split.c
@@ -15,6 +15,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
+#include "trnman.h"
+#include "ma_key_recover.h"
#ifdef HAVE_RTREE_KEYS
@@ -89,8 +91,11 @@ inline static void copy_coords(double *dst, const double *src, int n_dim)
memcpy(dst, src, sizeof(double) * (n_dim * 2));
}
-/*
-Select two nodes to collect group upon
+/**
+ Select two nodes to collect group upon.
+
+ Note that such function uses 'double' arithmetic so may behave differently
+ on different platforms/builds. There are others in this file.
*/
static void pick_seeds(SplitStruct *node, int n_entries,
SplitStruct **seed_a, SplitStruct **seed_b, int n_dim)
@@ -247,11 +252,136 @@ static int split_maria_rtree_node(SplitStruct *node, int n_entries,
return 0;
}
+
+/**
+ Logs key reorganization done in a split page (new page is logged elsewhere).
+
+ The effect of a split on the split page is three changes:
+ - some piece of the page move to different places inside this page (we are
+ not interested here in the pieces which move to the new page)
+ - the key is inserted into the page or not (could be in the new page)
+ - page is shrunk
+ All this is uniquely determined by a few parameters:
+ - the key (starting at 'key-nod_flag', for 'full_length' bytes
+ (maria_rtree_split_page() seems to depend on its parameters key&key_length
+ but in fact it reads more (to the left: nod_flag, and to the right:
+ full_length)
+ - the binary content of the page
+ - some variables in the share
+ - double arithmetic, which is unpredictable from machine to machine and
+ from build to build (see pick_seeds() above: it has a comparison between
+ double-s 'if (d > max_d)' so the comparison can go differently from machine
+ to machine or build to build, it has happened in real life).
+ If one day we use precision-math instead of double-math, in GIS, then the
+ last parameter would become constant accross machines and builds and we
+ could some cheap logging: just log the few parameters above.
+ Until then, we log the list of memcpy() operations (fortunately, we often do
+ not have to log the source bytes, as they can be found in the page before
+ applying the REDO; the only source bytes to log are the key), the key if it
+ was inserted into this page, and the shrinking.
+
+ @param info table
+ @param page page's offset in the file
+ @param buff content of the page (post-split)
+ @param key_with_nod_flag pointer to key-nod_flag
+ @param full_length length of (key + (nod_flag (if node) or rowid (if
+ leaf)))
+ @param log_internal_copy encoded list of mempcy() operations done on
+ split page, having their source in the page
+ @param log_internal_copy_length length of above list, in bytes
+ @param log_key_copy operation describing the key's copy, or NULL if the
+ inserted key was not put into the page (was put in
+ new page, so does not have to be logged here)
+ @param length_diff by how much the page has shrunk during split
+*/
+
+static my_bool _ma_log_rt_split(MARIA_HA *info,
+ my_off_t page, const uchar *buff,
+ const uchar *key_with_nod_flag,
+ uint full_length,
+ const uchar *log_internal_copy,
+ uint log_internal_copy_length,
+ const uchar *log_key_copy,
+ uint length_diff)
+{
+ MARIA_SHARE *share= info->s;
+ LSN lsn;
+ uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 2 + 1 + 2 + 2 + 7],
+ *log_pos;
+ LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 5];
+ uint translog_parts, extra_length= 0;
+ DBUG_ENTER("_ma_log_rt_split");
+ DBUG_PRINT("enter", ("page: %lu", (ulong) page));
+
+ DBUG_ASSERT(share->now_transactional);
+ page/= share->block_size;
+ page_store(log_data + FILEID_STORE_SIZE, page);
+ log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
+ log_pos[0]= KEY_OP_DEL_SUFFIX;
+ log_pos++;
+ DBUG_ASSERT((int)length_diff > 0);
+ int2store(log_pos, length_diff);
+ log_pos+= 2;
+ log_pos[0]= KEY_OP_MULTI_COPY;
+ log_pos++;
+ int2store(log_pos, full_length);
+ log_pos+= 2;
+ int2store(log_pos, log_internal_copy_length);
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_internal_copy;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].length= log_internal_copy_length;
+ translog_parts= 2;
+ if (log_key_copy != NULL) /* need to store key into record */
+ {
+ log_array[TRANSLOG_INTERNAL_PARTS + 2].str= log_key_copy;
+ log_array[TRANSLOG_INTERNAL_PARTS + 2].length= 1 + 2 + 1 + 2;
+ log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_with_nod_flag;
+ log_array[TRANSLOG_INTERNAL_PARTS + 3].length= full_length;
+ extra_length= 1 + 2 + 1 + 2 + full_length;
+ translog_parts+= 2;
+ }
+
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+ {
+ int page_length= _ma_get_page_used(share, buff);
+ ha_checksum crc;
+ crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
+ log_pos+= 2;
+ log_pos[0]= KEY_OP_CHECK;
+ int2store(log_pos + 1, page_length);
+ int4store(log_pos + 3, crc);
+ log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
+ log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
+ extra_length+= 7;
+ translog_parts++;
+ }
+#endif
+
+ if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
+ info->trn, info,
+ (translog_size_t) ((log_pos - log_data) +
+ log_internal_copy_length +
+ extra_length),
+ TRANSLOG_INTERNAL_PARTS + translog_parts,
+ log_array, log_data, NULL))
+ DBUG_RETURN(1);
+ DBUG_RETURN(0);
+}
+
+/**
+ 0 ok; the created page is put into page cache; the shortened one is not (up
+ to the caller to do it)
+ 1 or -1: error.
+ If new_page_offs==NULL, won't create new page (for redo phase).
+*/
+
int maria_rtree_split_page(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
- uchar *page, const uchar *key,
+ my_off_t page_offs, uchar *page, const uchar *key,
uint key_length, my_off_t *new_page_offs)
{
MARIA_SHARE *share= info->s;
+ const my_bool transactional= share->now_transactional;
int n1, n2; /* Number of items in groups */
SplitStruct *task;
SplitStruct *cur;
@@ -261,12 +391,14 @@ int maria_rtree_split_page(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
double *old_coord;
int n_dim;
uchar *source_cur, *cur1, *cur2;
- uchar *new_page;
+ uchar *new_page, *log_internal_copy, *log_internal_copy_ptr,
+ *log_key_copy= NULL;
int err_code= 0;
uint nod_flag= _ma_test_if_nod(share, page);
+ uint org_length= _ma_get_page_used(share, page), new_length;
uint full_length= key_length + (nod_flag ? nod_flag :
share->base.rec_reflength);
- int max_keys= ((_ma_get_page_used(share, page) - share->keypage_header) /
+ int max_keys= ((org_length - share->keypage_header) /
(full_length));
MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
DBUG_ENTER("maria_rtree_split_page");
@@ -312,11 +444,16 @@ int maria_rtree_split_page(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
goto split_err;
}
- if (!(new_page= (uchar*) my_alloca((uint)keyinfo->block_length)))
+ /* Allocate buffer for new page and piece of log record */
+ if (!(new_page= (uchar*) my_alloca((uint)keyinfo->block_length +
+ (transactional ?
+ (max_keys * (2 + 2) +
+ 1 + 2 + 1 + 2) : 0))))
{
err_code= -1;
goto split_err;
}
+ log_internal_copy= log_internal_copy_ptr= new_page + keyinfo->block_length;
bzero(new_page, share->block_size);
stop= task + (max_keys + 1);
@@ -327,47 +464,94 @@ int maria_rtree_split_page(MARIA_HA *info, const MARIA_KEYDEF *keyinfo,
for (cur= task; cur < stop; cur++)
{
uchar *to;
+ const uchar *cur_key= cur->key;
+ my_bool log_this_change;
+ DBUG_ASSERT(log_key_copy == NULL);
if (cur->n_node == 1)
{
to= cur1;
cur1= rt_PAGE_NEXT_KEY(share, cur1, key_length, nod_flag);
n1++;
+ log_this_change= transactional;
}
else
{
to= cur2;
cur2= rt_PAGE_NEXT_KEY(share, cur2, key_length, nod_flag);
n2++;
+ log_this_change= FALSE;
}
- if (to != cur->key)
- memcpy(to - nod_flag, cur->key - nod_flag, full_length);
+ if (to != cur_key)
+ {
+ uchar *to_with_nod_flag= to - nod_flag;
+ const uchar *cur_key_with_nod_flag= cur_key - nod_flag;
+ memcpy(to_with_nod_flag, cur_key_with_nod_flag, full_length);
+ if (log_this_change)
+ {
+ uint to_with_nod_flag_offs= to_with_nod_flag - page;
+ if (likely(cur_key != key))
+ {
+ /* this memcpy() is internal to the page (source in the page) */
+ uint cur_key_with_nod_flag_offs= cur_key_with_nod_flag - page;
+ int2store(log_internal_copy_ptr, to_with_nod_flag_offs);
+ log_internal_copy_ptr+= 2;
+ int2store(log_internal_copy_ptr, cur_key_with_nod_flag_offs);
+ log_internal_copy_ptr+= 2;
+ }
+ else
+ {
+ /* last iteration, and this involves *key: source is external */
+ log_key_copy= log_internal_copy_ptr;
+ log_key_copy[0]= KEY_OP_OFFSET;
+ int2store(log_key_copy + 1, to_with_nod_flag_offs);
+ log_key_copy[3]= KEY_OP_CHANGE;
+ int2store(log_key_copy + 4, full_length);
+ /* _ma_log_rt_split() will store *key, right after */
+ }
+ }
+ }
+ }
+ { /* verify that above loop didn't touch header bytes */
+ uint i;
+ for (i= 0; i < share->keypage_header; i++)
+ DBUG_ASSERT(new_page[i]==0);
}
if (nod_flag)
_ma_store_keypage_flag(share, new_page, KEYPAGE_FLAG_ISNOD);
_ma_store_keynr(share, new_page, keyinfo->key_nr);
- _ma_store_page_used(share, page, share->keypage_header + n1 * full_length)
_ma_store_page_used(share, new_page, share->keypage_header +
n2 * full_length);
+ new_length= share->keypage_header + n1 * full_length;
+ _ma_store_page_used(share, page, new_length);
if ((*new_page_offs= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
HA_OFFSET_ERROR)
err_code= -1;
else
- err_code= _ma_write_keypage(info, keyinfo, *new_page_offs,
- page_link->write_lock,
- DFLT_INIT_HITS, new_page);
+ {
+ if (transactional &&
+ ( /* log change to split page */
+ _ma_log_rt_split(info, page_offs, page, key - nod_flag,
+ full_length, log_internal_copy,
+ log_internal_copy_ptr - log_internal_copy,
+ log_key_copy, org_length - new_length) ||
+ /* and to new page */
+ _ma_log_new(info, *new_page_offs, new_page,
+ share->keypage_header + n2 * full_length,
+ keyinfo->key_nr, 0)))
+ err_code= -1;
+ if ( _ma_write_keypage(info, keyinfo, *new_page_offs,
+ page_link->write_lock,
+ DFLT_INIT_HITS, new_page))
+ err_code= -1;
+ }
DBUG_PRINT("rtree", ("split new block: %lu", (ulong) *new_page_offs));
- my_afree((uchar*)new_page);
+ my_afree(new_page);
split_err:
- /**
- @todo the cast below is useless (coord_buf is uchar*); at the moment we
- changed all "byte" to "uchar", some casts became useless and should be
- removed.
- */
- my_afree((uchar*) coord_buf);
+ my_afree(coord_buf);
DBUG_RETURN(err_code);
}
diff --git a/storage/maria/ma_rt_test.c b/storage/maria/ma_rt_test.c
index 18dca4810b6..10d3fc6eeeb 100644
--- a/storage/maria/ma_rt_test.c
+++ b/storage/maria/ma_rt_test.c
@@ -17,7 +17,12 @@
/* Written by Alex Barkov who has a shared copyright to this code */
-#include "maria.h"
+#include "maria_def.h"
+#include "ma_control_file.h"
+#include "ma_loghandler.h"
+#include "ma_checkpoint.h"
+#include "trnman.h"
+#include <my_getopt.h>
#ifdef HAVE_RTREE_KEYS
@@ -27,11 +32,13 @@
#define ndims 2
#define KEYALG HA_KEY_ALG_RTREE
-static int read_with_pos(MARIA_HA * file, int silent);
+static int read_with_pos(MARIA_HA * file);
static void create_record(uchar *record,uint rownr);
static void create_record1(uchar *record,uint rownr);
static void print_record(uchar * record,my_off_t offs,const char * tail);
static int run_test(const char *filename);
+static void get_options(int argc, char *argv[]);
+static void usage();
static double rt_data[]=
{
@@ -79,10 +86,32 @@ static double rt_data[]=
-1
};
-int main(int argc __attribute__((unused)),char *argv[] __attribute__((unused)))
+static int silent= 0, testflag= 0, transactional= 0,
+ die_in_middle_of_transaction= 0, checkpoint= 0, create_flag= 0;
+static enum data_file_type record_type= DYNAMIC_RECORD;
+
+int main(int argc, char *argv[])
{
MY_INIT(argv[0]);
- maria_init();
+ get_options(argc, argv);
+ maria_data_root= (char *)".";
+ /* Maria requires that we always have a page cache */
+ if (maria_init() ||
+ (init_pagecache(maria_pagecache, maria_block_size * 16, 0, 0,
+ maria_block_size, MY_WME) == 0) ||
+ ma_control_file_open(TRUE) ||
+ (init_pagecache(maria_log_pagecache,
+ TRANSLOG_PAGECACHE_SIZE, 0, 0,
+ TRANSLOG_PAGE_SIZE, MY_WME) == 0) ||
+ translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
+ 0, 0, maria_log_pagecache,
+ TRANSLOG_DEFAULT_FLAGS, 0) ||
+ (transactional && (trnman_init(0) || ma_checkpoint_init(0))))
+ {
+ fprintf(stderr, "Error in initialization\n");
+ exit(1);
+ }
+
exit(run_test("rt_test"));
}
@@ -97,16 +126,14 @@ static int run_test(const char *filename)
HA_KEYSEG keyseg[20];
key_range range;
- int silent=0;
int opt_unique=0;
- int create_flag=0;
int key_type=HA_KEYTYPE_DOUBLE;
int key_length=8;
int null_fields=0;
int nrecords=sizeof(rt_data)/(sizeof(double)*4);/* 3000;*/
int rec_length=0;
int uniques=0;
- int i;
+ int i, max_i;
int error;
int row_count=0;
uchar record[MAX_REC_LENGTH];
@@ -152,9 +179,10 @@ static int run_test(const char *filename)
bzero((char*) &create_info,sizeof(create_info));
create_info.max_rows=10000000;
+ create_info.transactional= transactional;
if (maria_create(filename,
- DYNAMIC_RECORD,
+ record_type,
1, /* keys */
keyinfo,
1+2*ndims+opt_unique, /* columns */
@@ -166,7 +194,11 @@ static int run_test(const char *filename)
if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED)))
goto err;
-
+ maria_begin(file);
+ if (testflag == 1)
+ goto end;
+ if (checkpoint == 1 && ma_checkpoint_execute(CHECKPOINT_MEDIUM, FALSE))
+ goto err;
if (!silent)
printf("- Writing key:s\n");
@@ -186,7 +218,7 @@ static int run_test(const char *filename)
}
}
- if ((error=read_with_pos(file,silent)))
+ if ((error=read_with_pos(file)))
goto err;
if (!silent)
@@ -198,7 +230,7 @@ static int run_test(const char *filename)
create_record(record,i);
bzero((char*) read_record,MAX_REC_LENGTH);
- error=maria_rkey(file,read_record,0,record+1,0,HA_READ_MBR_EQUAL);
+ error=maria_rkey(file,read_record,0,record+1,HA_WHOLE_KEY,HA_READ_MBR_EQUAL);
if (error && error!=HA_ERR_KEY_NOT_FOUND)
{
@@ -213,13 +245,25 @@ static int run_test(const char *filename)
print_record(read_record,maria_position(file),"\n");
}
+ if (checkpoint == 2 && ma_checkpoint_execute(CHECKPOINT_MEDIUM, FALSE))
+ goto err;
+
+ if (testflag == 2)
+ goto end;
+
if (!silent)
printf("- Deleting rows\n");
+ if (maria_scan_init(file))
+ {
+ fprintf(stderr, "maria_scan_init failed\n");
+ goto err;
+ }
+
for (i=0; i < nrecords/4; i++)
{
my_errno=0;
bzero((char*) read_record,MAX_REC_LENGTH);
- error=maria_rrnd(file,read_record,i == 0 ? 0L : HA_OFFSET_ERROR);
+ error=maria_scan(file,read_record);
if (error)
{
printf("pos: %2d maria_rrnd: %3d errno: %3d\n",i,error,my_errno);
@@ -234,18 +278,39 @@ static int run_test(const char *filename)
goto err;
}
}
+ maria_scan_end(file);
+
+ if (testflag == 3)
+ goto end;
+ if (checkpoint == 3 && ma_checkpoint_execute(CHECKPOINT_MEDIUM, FALSE))
+ goto err;
if (!silent)
printf("- Updating rows with position\n");
- for (i=0; i < (nrecords - nrecords/4) ; i++)
+ if (maria_scan_init(file))
+ {
+ fprintf(stderr, "maria_scan_init failed\n");
+ goto err;
+ }
+
+ /* We are looking for nrecords-necords/2 non-deleted records */
+ for (i=0, max_i= nrecords - nrecords/2; i < max_i ; i++)
{
my_errno=0;
bzero((char*) read_record,MAX_REC_LENGTH);
- error=maria_rrnd(file,read_record,i == 0 ? 0L : HA_OFFSET_ERROR);
+ error=maria_scan(file,read_record);
if (error)
{
if (error==HA_ERR_RECORD_DELETED)
+ {
+ printf("found deleted record\n");
+ /*
+ In BLOCK_RECORD format, maria_scan() never returns deleted records,
+ while in DYNAMIC format it can. Don't count such record:
+ */
+ max_i++;
continue;
+ }
printf("pos: %2d maria_rrnd: %3d errno: %3d\n",i,error,my_errno);
goto err;
}
@@ -261,8 +326,19 @@ static int run_test(const char *filename)
}
}
- if ((error=read_with_pos(file,silent)))
+ if (testflag == 4)
+ goto end;
+ if (checkpoint == 4 && ma_checkpoint_execute(CHECKPOINT_MEDIUM, FALSE))
+ goto err;
+
+ if (maria_scan_init(file))
+ {
+ fprintf(stderr, "maria_scan_init failed\n");
+ goto err;
+ }
+ if ((error=read_with_pos(file)))
goto err;
+ maria_scan_end(file);
if (!silent)
printf("- Test maria_rkey then a sequence of maria_rnext_same\n");
@@ -270,7 +346,8 @@ static int run_test(const char *filename)
create_record(record, nrecords*4/5);
print_record(record,0," search for\n");
- if ((error=maria_rkey(file,read_record,0,record+1,0,HA_READ_MBR_INTERSECT)))
+ if ((error=maria_rkey(file,read_record,0,record+1,HA_WHOLE_KEY,
+ HA_READ_MBR_INTERSECT)))
{
printf("maria_rkey: %3d errno: %3d\n",error,my_errno);
goto err;
@@ -330,6 +407,34 @@ static int run_test(const char *filename)
hrows= maria_records_in_range(file,0, &range, (key_range*) 0);
printf(" %ld rows\n", (long) hrows);
+end:
+ maria_scan_end(file);
+ if (die_in_middle_of_transaction)
+ {
+ /* see similar code in ma_test2.c for comments */
+ switch (die_in_middle_of_transaction) {
+ case 1:
+ _ma_flush_table_files(file, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
+ FLUSH_RELEASE, FLUSH_RELEASE);
+ break;
+ case 2:
+ if (translog_flush(file->trn->undo_lsn))
+ goto err;
+ break;
+ case 3:
+ break;
+ case 4:
+ _ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
+ FLUSH_RELEASE);
+ if (translog_flush(file->trn->undo_lsn))
+ goto err;
+ break;
+ }
+ printf("Dying on request without maria_commit()/maria_close()\n");
+ exit(0);
+ }
+ if (maria_commit(file))
+ goto err;
if (maria_close(file)) goto err;
maria_end();
my_end(MY_CHECK_ERROR);
@@ -343,7 +448,7 @@ err:
-static int read_with_pos (MARIA_HA * file,int silent)
+static int read_with_pos (MARIA_HA * file)
{
int error;
int i;
@@ -355,7 +460,7 @@ static int read_with_pos (MARIA_HA * file,int silent)
{
my_errno=0;
bzero((char*) read_record,MAX_REC_LENGTH);
- error=maria_rrnd(file,read_record,i == 0 ? 0L : HA_OFFSET_ERROR);
+ error=maria_scan(file,read_record);
if (error)
{
if (error==HA_ERR_END_OF_FILE)
@@ -467,6 +572,86 @@ static void create_record(uchar *record, uint rownr)
}
}
+
+static struct my_option my_long_options[] =
+{
+ {"checkpoint", 'H', "Checkpoint at specified stage", (uchar**) &checkpoint,
+ (uchar**) &checkpoint, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"checksum", 'c', "Undocumented",
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
+#ifndef DBUG_OFF
+ {"debug", '#', "Undocumented",
+ 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+#endif
+ {"help", '?', "Display help and exit",
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"row-fixed-size", 'S', "Fixed size records",
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"rows-in-block", 'M', "Store rows in block format",
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"silent", 's', "Undocumented",
+ (uchar**) &silent, (uchar**) &silent, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0,
+ 0, 0},
+ {"testflag", 't', "Stop test at specified stage", (uchar**) &testflag,
+ (uchar**) &testflag, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"test-undo", 'A',
+ "Abort hard. Used for testing recovery with undo",
+ (uchar**) &die_in_middle_of_transaction,
+ (uchar**) &die_in_middle_of_transaction,
+ 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"transactional", 'T',
+ "Test in transactional mode. (Only works with block format)",
+ (uchar**) &transactional, (uchar**) &transactional, 0, GET_BOOL, NO_ARG,
+ 0, 0, 0, 0, 0, 0},
+ { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
+};
+
+
+static my_bool
+get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
+ char *argument __attribute__((unused)))
+{
+ switch(optid) {
+ case 'c':
+ create_flag|= HA_CREATE_CHECKSUM | HA_CREATE_PAGE_CHECKSUM;
+ break;
+ case 'M':
+ record_type= BLOCK_RECORD;
+ break;
+ case 'S':
+ record_type= STATIC_RECORD;
+ break;
+ case '#':
+ DBUG_PUSH(argument);
+ break;
+ case '?':
+ usage();
+ exit(1);
+ }
+ return 0;
+}
+
+
+/* Read options */
+
+static void get_options(int argc, char *argv[])
+{
+ int ho_error;
+
+ if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
+ exit(ho_error);
+
+ return;
+} /* get options */
+
+
+static void usage()
+{
+ printf("Usage: %s [options]\n\n", my_progname);
+ my_print_help(my_long_options);
+ my_print_variables(my_long_options);
+}
+
#else
int main(int argc __attribute__((unused)),char *argv[] __attribute__((unused)))
{
diff --git a/storage/maria/ma_test1.c b/storage/maria/ma_test1.c
index abbad116c9c..bd373c4ad4b 100644
--- a/storage/maria/ma_test1.c
+++ b/storage/maria/ma_test1.c
@@ -70,7 +70,6 @@ extern int _ma_flush_table_files(MARIA_HA *info, uint flush_data_or_index,
int main(int argc,char *argv[])
{
MY_INIT(argv[0]);
- my_init();
get_options(argc,argv);
maria_data_root= (char *)".";
/* Maria requires that we always have a page cache */
diff --git a/storage/maria/ma_write.c b/storage/maria/ma_write.c
index fe0182e9a19..c8dac8f407c 100644
--- a/storage/maria/ma_write.c
+++ b/storage/maria/ma_write.c
@@ -45,27 +45,25 @@ static int _ma_ck_write_btree(register MARIA_HA *info, uint keynr,uchar *key,
static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
uchar *key, uint key_length,
my_off_t *root, uint comp_flag);
-static my_bool _ma_log_new(MARIA_HA *info, my_off_t page, uchar *buff,
- uint page_length, uint key_nr, my_bool root_page);
-static my_bool _ma_log_change(MARIA_HA *info, my_off_t page, uchar *buff,
- uchar *key_pos, uint length);
-static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, uchar *buff,
+static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
uint org_length, uint new_length,
- uchar *key_pos,
+ const uchar *key_pos,
uint key_length, int move_length,
enum en_key_op prefix_or_suffix,
- uchar *data, uint data_length,
+ const uchar *data, uint data_length,
uint changed_length);
-static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page, uchar *buff,
+static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page,
+ const uchar *buff,
uint org_length, uint new_length,
- uchar *key_pos, uint key_length,
+ const uchar *key_pos, uint key_length,
int move_length);
-static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page, uchar *buff,
+static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page,
+ const uchar *buff,
uint new_length,
uint data_added_first,
uint data_changed_first,
uint data_deleted_last,
- uchar *key_pos,
+ const uchar *key_pos,
uint key_length, int move_length);
/*
@@ -396,57 +394,9 @@ static int _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
error= _ma_ck_real_write_btree(info, keyinfo, key, key_length, &new_root,
comp_flag);
if (!error && share->now_transactional)
- {
- uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
- KEY_NR_STORE_SIZE];
- LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
- struct st_msg_to_write_hook_for_undo_key msg;
-
- /* Save if we need to write a clr record */
- info->key_write_undo_lsn[keyinfo->key_nr]= info->trn->undo_lsn;
- lsn_store(log_data, info->trn->undo_lsn);
- key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
- keyinfo->key_nr);
- key_length+= share->rec_reflength;
- log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
- log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
- log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_buff;
- log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
-
- msg.root= root;
- msg.value= new_root;
- msg.auto_increment= 0;
- if (share->base.auto_key == ((uint)keyinfo->key_nr + 1))
- {
- const HA_KEYSEG *keyseg= keyinfo->seg;
- uchar *to= key_buff;
- if (keyseg->flag & HA_SWAP_KEY)
- {
- /* We put key from log record to "data record" packing format... */
- uchar reversed[HA_MAX_KEY_BUFF];
- uchar *key_ptr= to;
- uchar *key_end= key_ptr + keyseg->length;
- to= reversed + keyseg->length;
- do
- {
- *--to= *key_ptr++;
- } while (key_ptr != key_end);
- }
- /* ... so that we can read it with: */
- msg.auto_increment=
- ma_retrieve_auto_increment(to, keyseg->type);
- /* and write_hook_for_undo_key_insert() will pick this. */
- }
-
- if (translog_write_record(&lsn, LOGREC_UNDO_KEY_INSERT,
- info->trn, info,
- (translog_size_t)
- log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
- key_length,
- TRANSLOG_INTERNAL_PARTS + 2, log_array,
- log_data + LSN_STORE_SIZE, &msg))
- error= -1;
- }
+ error=
+ _ma_write_undo_key_insert(info, keyinfo, key_buff, key_length,
+ root, new_root, &lsn);
else
{
*root= new_root;
@@ -778,6 +728,12 @@ int _ma_insert(register MARIA_HA *info, register MARIA_KEYDEF *keyinfo,
DBUG_ASSERT((*b & 128) == 0);
#if HA_FT_MAXLEN >= 127
blen= mi_uint2korr(b); b+=2;
+ When you enable this code, as part of the MyISAM->Maria merge of
+ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0
+ restore ft2 functionality, fix bugs.
+ Then this will enable two-level fulltext index, which is not totally
+ recoverable yet.
+ So remove this text and inform Guilhem so that he fixes the issue.
#else
blen= *b++;
#endif
@@ -1636,6 +1592,63 @@ void maria_end_bulk_insert(MARIA_HA *info)
Dedicated functions that generate log entries
****************************************************************************/
+
+int _ma_write_undo_key_insert(MARIA_HA *info,
+ const MARIA_KEYDEF *keyinfo,
+ const uchar *key, uint key_length,
+ my_off_t *root, my_off_t new_root, LSN *res_lsn)
+{
+ MARIA_SHARE *share= info->s;
+ uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
+ KEY_NR_STORE_SIZE];
+ LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
+ struct st_msg_to_write_hook_for_undo_key msg;
+
+ /* Save if we need to write a clr record */
+ info->key_write_undo_lsn[keyinfo->key_nr]= info->trn->undo_lsn;
+ lsn_store(log_data, info->trn->undo_lsn);
+ key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE,
+ keyinfo->key_nr);
+ key_length+= share->rec_reflength;
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length;
+
+ msg.root= root;
+ msg.value= new_root;
+ msg.auto_increment= 0;
+ if (share->base.auto_key == ((uint)keyinfo->key_nr + 1))
+ {
+ const HA_KEYSEG *keyseg= keyinfo->seg;
+ if (keyseg->flag & HA_SWAP_KEY)
+ {
+ /* We put key from log record to "data record" packing format... */
+ uchar reversed[HA_MAX_KEY_BUFF];
+ const uchar *key_ptr= key, *key_end= key + keyseg->length;
+ uchar *to= reversed + keyseg->length;
+ do
+ {
+ *--to= *key_ptr++;
+ } while (key_ptr != key_end);
+ key= to;
+ }
+ /* ... so that we can read it with: */
+ msg.auto_increment=
+ ma_retrieve_auto_increment(key, keyseg->type);
+ /* and write_hook_for_undo_key_insert() will pick this. */
+ }
+
+ return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT,
+ info->trn, info,
+ (translog_size_t)
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
+ key_length,
+ TRANSLOG_INTERNAL_PARTS + 2, log_array,
+ log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
+}
+
+
/**
@brief Log creation of new page
@@ -1647,8 +1660,8 @@ void maria_end_bulk_insert(MARIA_HA *info)
@retval 0 ok
*/
-static my_bool _ma_log_new(MARIA_HA *info, my_off_t page, uchar *buff,
- uint page_length, uint key_nr, my_bool root_page)
+my_bool _ma_log_new(MARIA_HA *info, my_off_t page, const uchar *buff,
+ uint page_length, uint key_nr, my_bool root_page)
{
LSN lsn;
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE
@@ -1698,15 +1711,15 @@ static my_bool _ma_log_new(MARIA_HA *info, my_off_t page, uchar *buff,
Log when some part of the key page changes
*/
-static my_bool _ma_log_change(MARIA_HA *info, my_off_t page, uchar *buff,
- uchar *key_pos, uint length)
+my_bool _ma_log_change(MARIA_HA *info, my_off_t page, const uchar *buff,
+ const uchar *key_pos, uint length)
{
LSN lsn;
- uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 6], *log_pos;
- LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
- uint offset= (uint) (key_pos - buff);
+ uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 6 + 7], *log_pos;
+ LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
+ uint offset= (uint) (key_pos - buff), translog_parts, extra_length= 0;
DBUG_ENTER("_ma_log_change");
- DBUG_PRINT("enter", ("page: %lu", (ulong) page));
+ DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) page, length));
DBUG_ASSERT(info->s->now_transactional);
@@ -1720,15 +1733,33 @@ static my_bool _ma_log_change(MARIA_HA *info, my_off_t page, uchar *buff,
int2store(log_pos+4, length);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
- log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
- log_array[TRANSLOG_INTERNAL_PARTS + 1].str= buff + offset;
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data) - 7;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
+ translog_parts= 2;
+
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+ {
+ int page_length= _ma_get_page_used(info->s, buff);
+ ha_checksum crc;
+ crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
+ log_pos+= 6;
+ log_pos[0]= KEY_OP_CHECK;
+ int2store(log_pos+1, page_length);
+ int4store(log_pos+3, crc);
+ log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= (char *) log_pos;
+ log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
+ extra_length+= 7;
+ translog_parts++;
+ }
+#endif
if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
info->trn, info,
- (translog_size_t) (sizeof(log_data) + length),
- TRANSLOG_INTERNAL_PARTS + 2, log_array,
- log_data, NULL))
+ (translog_size_t) (sizeof(log_data) - 7 + length +
+ extra_length),
+ TRANSLOG_INTERNAL_PARTS + translog_parts,
+ log_array, log_data, NULL))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
@@ -1750,11 +1781,11 @@ static my_bool _ma_log_change(MARIA_HA *info, my_off_t page, uchar *buff,
*/
-static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, uchar *buff,
+static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, const uchar *buff,
uint org_length, uint new_length,
- uchar *key_pos, uint key_length, int move_length,
- enum en_key_op prefix_or_suffix,
- uchar *data, uint data_length,
+ const uchar *key_pos, uint key_length,
+ int move_length, enum en_key_op prefix_or_suffix,
+ const uchar *data, uint data_length,
uint changed_length)
{
LSN lsn;
@@ -1885,9 +1916,10 @@ static my_bool _ma_log_split(MARIA_HA *info, my_off_t page, uchar *buff,
@retval 1 error
*/
-static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page, uchar *buff,
+static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page,
+ const uchar *buff,
uint org_length, uint new_length,
- uchar *key_pos, uint key_length,
+ const uchar *key_pos, uint key_length,
int move_length)
{
LSN lsn;
@@ -1973,12 +2005,13 @@ static my_bool _ma_log_del_prefix(MARIA_HA *info, my_off_t page, uchar *buff,
data deleted last. Old changed key may be part of page
*/
-static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page, uchar *buff,
+static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page,
+ const uchar *buff,
uint new_length,
uint data_added_first,
uint data_changed_first,
uint data_deleted_last,
- uchar *key_pos,
+ const uchar *key_pos,
uint key_length, int move_length)
{
LSN lsn;
@@ -2084,7 +2117,7 @@ static my_bool _ma_log_key_middle(MARIA_HA *info, my_off_t page, uchar *buff,
*/
static my_bool _ma_log_middle(MARIA_HA *info, my_off_t page,
- uchar *buff,
+ const uchar *buff,
uint data_added_first, uint data_changed_first,
uint data_deleted_last)
{
diff --git a/storage/maria/maria_read_log.c b/storage/maria/maria_read_log.c
index c2ed0c736dd..5c9a8e235a3 100644
--- a/storage/maria/maria_read_log.c
+++ b/storage/maria/maria_read_log.c
@@ -267,7 +267,7 @@ static void get_options(int *argc,char ***argv)
if (!opt_apply)
opt_apply_undo= FALSE;
- if ((opt_display_only + opt_apply) != 1)
+ if (((opt_display_only + opt_apply) != 1) || (*argc > 0))
{
usage();
exit(1);