summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysys/my_realloc.c11
-rw-r--r--sql/mysqld.cc17
-rw-r--r--storage/maria/ha_maria.cc6
-rw-r--r--storage/maria/ma_bitmap.c13
-rw-r--r--storage/maria/ma_blockrec.c13
-rw-r--r--storage/maria/ma_check.c7
-rw-r--r--storage/maria/ma_control_file.c13
-rw-r--r--storage/maria/ma_control_file.h2
-rw-r--r--storage/maria/ma_create.c36
-rw-r--r--storage/maria/ma_loghandler.c15
-rw-r--r--storage/maria/ma_loghandler.h3
-rw-r--r--storage/maria/ma_loghandler_lsn.h3
-rw-r--r--storage/maria/ma_open.c60
-rw-r--r--storage/maria/ma_recovery.c1007
-rw-r--r--storage/maria/ma_recovery.h3
-rw-r--r--storage/maria/maria_chk.c2
-rw-r--r--storage/maria/maria_read_log.c5
-rw-r--r--storage/maria/trnman.c27
-rw-r--r--storage/maria/trnman_public.h2
19 files changed, 808 insertions, 437 deletions
diff --git a/mysys/my_realloc.c b/mysys/my_realloc.c
index c7cf1323cd4..a55282e03a0 100644
--- a/mysys/my_realloc.c
+++ b/mysys/my_realloc.c
@@ -22,6 +22,16 @@
/* My memory re allocator */
+/**
+ @brief wrapper around realloc()
+
+ @param oldpoint pointer to currently allocated area
+ @param size new size requested, must be >0
+ @param my_flags flags
+
+ @note if size==0 realloc() may return NULL; my_realloc() treats this as an
+ error which is not the intention of realloc()
+*/
void* my_realloc(void* oldpoint, size_t size, myf my_flags)
{
void *point;
@@ -29,6 +39,7 @@ void* my_realloc(void* oldpoint, size_t size, myf my_flags)
DBUG_PRINT("my",("ptr: 0x%lx size: %lu my_flags: %d", (long) oldpoint,
(ulong) size, my_flags));
+ DBUG_ASSERT(size > 0);
if (!oldpoint && (my_flags & MY_ALLOW_ZERO_PTR))
DBUG_RETURN(my_malloc(size,my_flags));
#ifdef USE_HALLOC
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index dec25215acc..4b403fd85b6 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -3418,6 +3418,17 @@ server.");
using_update_log=1;
}
+ /* call ha_init_key_cache() on all key caches to init them */
+ process_key_caches(&ha_init_key_cache);
+ /*
+ Maria's pagecache needs to be ready before Maria engine (Recovery uses
+ pagecache, and Checkpoint may happen at startup). Maria engine is taken up
+ in plugin_init().
+ */
+#ifdef WITH_MARIA_STORAGE_ENGINE
+ process_pagecaches(&ha_init_pagecache);
+#endif /* WITH_MARIA_STORAGE_ENGINE */
+
/* Allow storage engine to give real error messages */
if (ha_init_errors())
DBUG_RETURN(1);
@@ -3588,12 +3599,6 @@ server.");
if (opt_myisam_log)
(void) mi_log(1);
- /* call ha_init_key_cache() on all key caches to init them */
- process_key_caches(&ha_init_key_cache);
-#ifdef WITH_MARIA_STORAGE_ENGINE
- process_pagecaches(&ha_init_pagecache);
-#endif /* WITH_MARIA_STORAGE_ENGINE */
-
#if defined(HAVE_MLOCKALL) && defined(MCL_CURRENT) && !defined(EMBEDDED_LIBRARY)
if (locked_in_memory && !getuid())
{
diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc
index 59f97c8e1e5..c2fa0ec14b1 100644
--- a/storage/maria/ha_maria.cc
+++ b/storage/maria/ha_maria.cc
@@ -33,6 +33,10 @@ C_MODE_START
#include "ma_blockrec.h"
C_MODE_END
+/*
+ Note that in future versions, only *transactional* Maria tables can
+ rollback, so this flag should be up or down conditionally.
+*/
#define MARIA_CANNOT_ROLLBACK HA_NO_TRANSACTIONS
#ifdef MARIA_CANNOT_ROLLBACK
#define trans_register_ha(A, B, C) do { /* nothing */ } while(0)
@@ -2385,7 +2389,7 @@ static int ha_maria_init(void *p)
maria_hton->flags= HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES;
bzero(maria_log_pagecache, sizeof(*maria_log_pagecache));
maria_data_root= mysql_real_data_home;
- res= maria_init() || ma_control_file_create_or_open(TRUE) ||
+ res= maria_init() || ma_control_file_create_or_open() ||
(init_pagecache(maria_log_pagecache,
TRANSLOG_PAGECACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE) == 0) ||
diff --git a/storage/maria/ma_bitmap.c b/storage/maria/ma_bitmap.c
index ca9657128e4..66377172877 100644
--- a/storage/maria/ma_bitmap.c
+++ b/storage/maria/ma_bitmap.c
@@ -512,15 +512,19 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap,
ulonglong page)
{
- my_off_t position= page * bitmap->block_size;
+ my_off_t end_of_page= (page + 1) * bitmap->block_size;
my_bool res;
DBUG_ENTER("_ma_read_bitmap_page");
DBUG_ASSERT(page % bitmap->pages_covered == 0);
bitmap->page= page;
- if (position >= share->state.state.data_file_length)
+ if (end_of_page > share->state.state.data_file_length)
{
- share->state.state.data_file_length= position + bitmap->block_size;
+ /*
+ Inexistent or half-created page (could be crash in the middle of
+ _ma_bitmap_create_first(), before appending maria_bitmap_marker).
+ */
+ share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
@@ -2047,7 +2051,8 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
{
uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file;
- if (my_chsize(file, block_size, 0, MYF(MY_WME)) ||
+ if (my_chsize(file, block_size - sizeof(maria_bitmap_marker),
+ 0, MYF(MY_WME)) ||
my_pwrite(file, maria_bitmap_marker, sizeof(maria_bitmap_marker),
block_size - sizeof(maria_bitmap_marker),
MYF(MY_NABP | MY_WME)))
diff --git a/storage/maria/ma_blockrec.c b/storage/maria/ma_blockrec.c
index d8f65c7b367..02d8d8db276 100644
--- a/storage/maria/ma_blockrec.c
+++ b/storage/maria/ma_blockrec.c
@@ -4178,9 +4178,6 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
empty_space= (block_size - PAGE_OVERHEAD_SIZE);
rec_offset= PAGE_HEADER_SIZE;
dir= buff+ block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE;
-
- /* Update that file is extended */
- info->state->data_file_length= (page + 1) * info->s->block_size;
}
else
{
@@ -4304,6 +4301,16 @@ uint _ma_apply_redo_insert_row_head_or_tail(MARIA_HA *info, LSN lsn,
if (_ma_bitmap_set(info, page, page_type == HEAD_PAGE, empty_space))
DBUG_RETURN(my_errno);
+ /*
+ Data page and bitmap page are in place, we can update data_file_length in
+ case we extended the file. We could not do it earlier: bitmap code tests
+ data_file_length to know if it has to create a new page or not.
+ */
+ {
+ my_off_t end_of_page= (page + 1) * info->s->block_size;
+ set_if_bigger(info->state->data_file_length, end_of_page);
+ }
+
DBUG_RETURN(0);
err:
diff --git a/storage/maria/ma_check.c b/storage/maria/ma_check.c
index a68a21d0180..1ac1fb3454f 100644
--- a/storage/maria/ma_check.c
+++ b/storage/maria/ma_check.c
@@ -2046,15 +2046,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
goto err;
}
_ma_reset_status(sort_info.new_info);
-#ifdef ASK_MONTY /* cf maria_create() */
- /**
- @todo ASK_MONTY
- without this call, a REPAIR on an empty table leaves the data file of
- size 0, which sounds reasonable.
- */
if (_ma_initialize_data_file(sort_info.new_info->s, new_file))
goto err;
-#endif
block_record= 1;
}
}
diff --git a/storage/maria/ma_control_file.c b/storage/maria/ma_control_file.c
index 4174a0e797e..3816830d9e1 100644
--- a/storage/maria/ma_control_file.c
+++ b/storage/maria/ma_control_file.c
@@ -41,6 +41,10 @@
#define CONTROL_FILE_SIZE (CONTROL_FILE_FILENO_OFFSET + CONTROL_FILE_FILENO_SIZE)
/* This module owns these two vars. */
+/**
+ This LSN serves for the two-checkpoint rule, and also to find the
+ checkpoint record when doing a recovery.
+*/
LSN last_checkpoint_lsn= LSN_IMPOSSIBLE;
uint32 last_logno= FILENO_IMPOSSIBLE;
@@ -68,8 +72,6 @@ static int control_file_fd= -1;
the last_checkpoint_lsn and last_logno global variables.
Called at engine's start.
- @param create_if_missing
-
@note
The format of the control file is:
4 bytes: magic string
@@ -78,11 +80,13 @@ static int control_file_fd= -1;
4 bytes: offset in log where last checkpoint is
4 bytes: number of last log
+ @note If in recovery, file is not created
+
@return Operation status
@retval 0 OK
@retval 1 Error (in which case the file is left closed)
*/
-CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
+CONTROL_FILE_ERROR ma_control_file_create_or_open()
{
char buffer[CONTROL_FILE_SIZE];
char name[FN_REFLEN];
@@ -111,7 +115,8 @@ CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool create_if_missing)
if (create_file)
{
- if (!create_if_missing)
+ /* in a recovery, we expect to find a control file */
+ if (maria_in_recovery)
DBUG_RETURN(CONTROL_FILE_MISSING);
if ((control_file_fd= my_create(name, 0,
open_flags, MYF(MY_SYNC_DIR))) < 0)
diff --git a/storage/maria/ma_control_file.h b/storage/maria/ma_control_file.h
index d69f221abb8..88a1780543a 100644
--- a/storage/maria/ma_control_file.h
+++ b/storage/maria/ma_control_file.h
@@ -61,7 +61,7 @@ extern "C" {
If present, reads it to find out last checkpoint's LSN and last log.
Called at engine's start.
*/
-CONTROL_FILE_ERROR ma_control_file_create_or_open(my_bool);
+CONTROL_FILE_ERROR ma_control_file_create_or_open();
/*
Write information durably to the control file.
Called when we have created a new log (after syncing this log's creation)
diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c
index f944b9d8bf7..ba66bdb8ffb 100644
--- a/storage/maria/ma_create.c
+++ b/storage/maria/ma_create.c
@@ -664,6 +664,14 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.base.keystart = share.state.state.key_file_length=
MY_ALIGN(info_length, maria_block_size);
+ if (share.data_file_type == BLOCK_RECORD)
+ {
+ /*
+ we are going to create a first bitmap page, set data_file_length
+ to reflect this, before the state goes to disk
+ */
+ share.state.state.data_file_length= maria_block_size;
+ }
share.base.max_key_block_length= maria_block_size;
share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
share.base.records=ci->max_rows;
@@ -1041,36 +1049,8 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err;
errpos=3;
- /**
- @todo ASK_MONTY
- QQ: this sets data_file_length from 0 to 8192, but we wrote the state
- already to the index file (because:
- - log record is built from index header so state must be written before
- log record
- - data file must be created after log record, so that "missing log
- record" implies "unusable table").
- When we wrote the state, we hadn't called ma_initialize_data_file(), so
- the data_file_length is 0!
- Thus, we below create a 8192-byte data file, but its recorded size is 0,
- so next time we read the bitmap (a maria_write() for example) we'll
- overwrite the bitmap we just created below.
- It's not very efficient.
- It also makes maria_chk_size() print
- Size of datafile is: 8192 Should be: 0
- on a freshly created table (run "check.test" with a Maria table).
-
- Why do we absolutely want to create a 8192-byte page for a freshly
- created, empty table? Why don't we leave the data file empty?
- Removing the call below at least removes the maria_chk_size() issue.
-
- Monty wrote on IRC, about a size of 0:
- "This basically ok; The first block is a bitmap that may or may not
- exists", but later he asked that the first block always exists.???
- */
-#ifdef ASK_MONTY
if (_ma_initialize_data_file(&share, dfile))
goto err;
-#endif
}
/* Enlarge files */
diff --git a/storage/maria/ma_loghandler.c b/storage/maria/ma_loghandler.c
index ed34629b263..273ed8b55d0 100644
--- a/storage/maria/ma_loghandler.c
+++ b/storage/maria/ma_loghandler.c
@@ -6158,7 +6158,7 @@ static my_bool write_hook_for_redo(enum translog_record_type type
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;)
*/
- DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0)));
+ DBUG_ASSERT(trn->trid != 0);
/*
If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
@@ -6187,7 +6187,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type
struct st_translog_parts *parts
__attribute__ ((unused)))
{
- DBUG_ASSERT(!(maria_multi_threaded && (trn->trid == 0)));
+ DBUG_ASSERT(trn->trid != 0);
trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn=
@@ -6300,6 +6300,17 @@ void translog_deassign_id_from_share(MARIA_SHARE *share)
}
+void translog_assign_id_to_share_from_recovery(MARIA_SHARE *share,
+ uint16 id)
+{
+ DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
+ DBUG_ASSERT(share->data_file_type == BLOCK_RECORD);
+ DBUG_ASSERT(share->id == 0);
+ DBUG_ASSERT(id_to_share[id] == NULL);
+ id_to_share[share->id= id]= share;
+}
+
+
/**
@brief check if such log file exists
diff --git a/storage/maria/ma_loghandler.h b/storage/maria/ma_loghandler.h
index 789057d7e1f..e5c1652373c 100644
--- a/storage/maria/ma_loghandler.h
+++ b/storage/maria/ma_loghandler.h
@@ -258,6 +258,9 @@ extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share,
struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share);
+extern void
+translog_assign_id_to_share_from_recovery(struct st_maria_share *share,
+ uint16 id);
extern my_bool translog_inited;
/*
diff --git a/storage/maria/ma_loghandler_lsn.h b/storage/maria/ma_loghandler_lsn.h
index df41ceec7c8..b106a4ab30e 100644
--- a/storage/maria/ma_loghandler_lsn.h
+++ b/storage/maria/ma_loghandler_lsn.h
@@ -84,6 +84,9 @@ typedef LSN LSN_WITH_FLAGS;
/* following LSN also is impossible */
#define LSN_ERROR 1
+/** @brief some impossible LSN serve as markers */
+#define LSN_REPAIRED_BY_MARIA_CHK ((LSN)1)
+
/**
@brief the maximum valid LSN.
Unlike ULONGLONG_MAX, it can be safely used in comparison with valid LSNs
diff --git a/storage/maria/ma_open.c b/storage/maria/ma_open.c
index b5560220b63..5ee6931f69f 100644
--- a/storage/maria/ma_open.c
+++ b/storage/maria/ma_open.c
@@ -171,7 +171,8 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
share->delay_key_write=1;
info.state= &share->state.state; /* Change global values by default */
- info.trn= &dummy_transaction_object;
+ if (!share->base.born_transactional) /* but for transactional ones ... */
+ info.trn= &dummy_transaction_object; /* ... force crash if no trn given */
pthread_mutex_unlock(&share->intern_lock);
/* Allocate buffer for one record */
@@ -601,15 +602,30 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{
share->page_type= PAGECACHE_LSN_PAGE;
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
- if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
- (open_flags & HA_OPEN_FROM_SQL_LAYER)))
+ if (share->state.create_rename_lsn == LSN_REPAIRED_BY_MARIA_CHK)
{
/*
- This table was repaired with maria_chk. Past log records should be
- ignored, future log records should not: we define the present.
+ Was repaired with maria_chk, maybe later maria_pack-ed. Some sort of
+ import into the server. It starts its existence (from the point of
+ view of the server, including server's recovery) now.
*/
- share->state.create_rename_lsn= translog_get_horizon();
- _ma_update_create_rename_lsn_on_disk(share, TRUE);
+ if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery)
+ {
+ share->state.create_rename_lsn= translog_get_horizon();
+ _ma_update_create_rename_lsn_on_disk(share, TRUE);
+ }
+ }
+ else if (!LSN_VALID(share->state.create_rename_lsn) &&
+ !(open_flags & HA_OPEN_FOR_REPAIR))
+ {
+ /*
+ If in Recovery, it will not work. If LSN is invalid and not
+ LSN_REPAIRED_BY_MARIA_CHK, header must be corrupted.
+ In both cases, must repair.
+ */
+ my_errno=((share->state.changed & STATE_CRASHED_ON_REPAIR) ?
+ HA_ERR_CRASHED_ON_REPAIR : HA_ERR_CRASHED_ON_USAGE);
+ goto err;
}
}
else
@@ -699,6 +715,14 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
{
share->lock.get_status=_ma_get_status;
share->lock.copy_status=_ma_copy_status;
+ /**
+ @todo RECOVERY
+ INSERT DELAYED and concurrent inserts are currently disabled for
+ transactional tables; when enabled again, we should re-evaluate
+ what problems the call to _ma_update_status() by
+ thr_reschedule_write_lock() can do (it may hurt Checkpoint as it
+ would be without intern_lock, and it modifies the state).
+ */
share->lock.update_status=_ma_update_status;
share->lock.restore_status=_ma_restore_status;
share->lock.check_status=_ma_check_status;
@@ -958,6 +982,7 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
uchar buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
uchar *ptr=buff;
uint i, keys= (uint) state->header.keys;
+ size_t res;
DBUG_ENTER("_ma_state_info_write");
memcpy_fixed(ptr,&state->header,sizeof(state->header));
@@ -1013,11 +1038,12 @@ uint _ma_state_info_write(File file, MARIA_STATE_INFO *state, uint pWrite)
}
}
- if (pWrite & 1)
- DBUG_RETURN(my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
- MYF(MY_NABP | MY_THREADSAFE)) != 0);
- DBUG_RETURN(my_write(file, buff, (size_t) (ptr-buff),
- MYF(MY_NABP)) != 0);
+ res= (pWrite & 1) ?
+ my_pwrite(file, buff, (size_t) (ptr-buff), 0L,
+ MYF(MY_NABP | MY_THREADSAFE)) :
+ my_write(file, buff, (size_t) (ptr-buff),
+ MYF(MY_NABP));
+ DBUG_RETURN(res != 0);
}
@@ -1072,6 +1098,16 @@ uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
}
+/**
+ @brief Fills the state by reading its copy on disk.
+
+ @note Does nothing in single user mode.
+
+ @param file file to read from
+ @param state state which will be filled
+ @param pRead if true, use my_pread(), otherwise my_read()
+*/
+
uint _ma_state_info_read_dsk(File file, MARIA_STATE_INFO *state, my_bool pRead)
{
char buff[MARIA_STATE_INFO_SIZE + MARIA_STATE_EXTRA_SIZE];
diff --git a/storage/maria/ma_recovery.c b/storage/maria/ma_recovery.c
index c583a0cdd74..c6bb6306771 100644
--- a/storage/maria/ma_recovery.c
+++ b/storage/maria/ma_recovery.c
@@ -23,25 +23,39 @@
#include "maria_def.h"
#include "ma_recovery.h"
#include "ma_blockrec.h"
+#include "trnman.h"
-struct TRN_FOR_RECOVERY
+struct st_trn_for_recovery /* used only in the REDO phase */
{
- LSN group_start_lsn, undo_lsn;
+ LSN group_start_lsn, undo_lsn, first_undo_lsn;
TrID long_trid;
};
-
+struct st_dirty_page /* used only in the REDO phase */
+{
+ uint64 file_and_page_id;
+ LSN rec_lsn;
+};
+struct st_table_for_recovery /* used in the REDO and UNDO phase */
+{
+ MARIA_HA *info;
+ File org_kfile, org_dfile; /**< OS descriptors when Checkpoint saw table */
+};
/* Variables used by all functions of this module. Ok as single-threaded */
-static struct TRN_FOR_RECOVERY *all_active_trans;
-static MARIA_HA **all_tables;
-static LSN current_group_end_lsn;
-FILE *tracef; /**< trace file for debugging */
-
-#define prototype_exec_hook(R) \
-static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
+static struct st_trn_for_recovery *all_active_trans;
+static struct st_table_for_recovery *all_tables;
+static HASH all_dirty_pages;
+static struct st_dirty_page *dirty_pages_pool;
+static LSN current_group_end_lsn,
+ checkpoint_start= LSN_IMPOSSIBLE;
+static FILE *tracef; /**< trace file for debugging */
+
+#define prototype_exec_hook(R) \
+ static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
+#define prototype_exec_hook_dummy(R) \
+ static int exec_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec \
+ __attribute ((unused)))
prototype_exec_hook(LONG_TRANSACTION_ID);
-#ifdef MARIA_CHECKPOINT
-prototype_exec_hook(CHECKPOINT);
-#endif
+prototype_exec_hook_dummy(CHECKPOINT);
prototype_exec_hook(REDO_CREATE_TABLE);
prototype_exec_hook(REDO_DROP_TABLE);
prototype_exec_hook(FILE_ID);
@@ -53,10 +67,11 @@ prototype_exec_hook(REDO_PURGE_BLOCKS);
prototype_exec_hook(REDO_DELETE_ALL);
prototype_exec_hook(UNDO_ROW_INSERT);
prototype_exec_hook(UNDO_ROW_DELETE);
-prototype_exec_hook(UNDO_ROW_UPDATE);
prototype_exec_hook(UNDO_ROW_PURGE);
prototype_exec_hook(COMMIT);
-static int end_of_redo_phase();
+static int run_redo_phase(LSN lsn, my_bool apply);
+static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
+static int run_undo_phase(uint unfinished);
static void display_record_position(const LOG_DESC *log_desc,
const TRANSLOG_HEADER_BUFFER *rec,
uint number);
@@ -66,83 +81,57 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
TRANSLOG_HEADER_BUFFER *rec);
-static int close_recovered_table(MARIA_HA *info);
-
+static void prepare_table_for_close(MARIA_HA *info, LSN at_lsn);
+static int parse_checkpoint_record(LSN lsn);
+static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
+ LSN first_undo_lsn);
+static int new_table(uint16 sid, const char *name,
+ File org_kfile, File org_dfile, LSN lsn);
+static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
+ struct st_dirty_page *dirty_page);
+static int close_all_tables();
/** @brief global [out] buffer for translog_read_record(); never shrinks */
static LEX_STRING log_record_buffer;
#define enlarge_buffer(rec) \
- if (log_record_buffer.length < rec->record_length) \
+ if (log_record_buffer.length < (rec)->record_length) \
{ \
- log_record_buffer.length= rec->record_length; \
+ log_record_buffer.length= (rec)->record_length; \
log_record_buffer.str= my_realloc(log_record_buffer.str, \
- rec->record_length, MYF(MY_WME)); \
+ (rec)->record_length, MYF(MY_WME)); \
}
#define ALERT_USER() DBUG_ASSERT(0)
+#define LSN_IN_HEX(L) (ulong)LSN_FILE_NO(L),(ulong)LSN_OFFSET(L)
/**
- @brief Recovers from the last checkpoint
+ @brief Recovers from the last checkpoint.
+
+ Runs the REDO phase using special structures, then sets up the playground
+ of runtime: recreates transactions inside trnman, open tables with their
+ two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
+ tables.
*/
int maria_recover()
{
- my_bool res= TRUE;
- LSN from_lsn;
+ int res= 1;
FILE *trace_file;
DBUG_ENTER("maria_recover");
DBUG_ASSERT(!maria_in_recovery);
maria_in_recovery= TRUE;
- if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
- {
- from_lsn= translog_first_theoretical_lsn();
- /*
- as far as we have not yet any checkpoint then the very first
- log file should be present.
- */
- DBUG_ASSERT(from_lsn != LSN_IMPOSSIBLE);
- /*
- @todo process eroror of getting checkpoint
- if (from_lsn == ERROR_LSN)
- ...
- */
- }
- else
- {
- DBUG_ASSERT(0); /* not yet implemented */
- /**
- @todo read the checkpoint record, fill structures
- and use the minimum of checkpoint_start_lsn, rec_lsn of trns, rec_lsn
- of dirty pages.
- */
- //from_lsn= something;
- }
-
- /*
- mysqld has not yet initialized any page cache. Let's create a dedicated
- one for recovery.
- */
if ((trace_file= fopen("maria_recovery.trace", "w")))
{
fprintf(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
- res= (init_pagecache(maria_pagecache,
- /** @todo what size? */
- 1024*1024,
- 0, 0,
- maria_block_size) == 0) ||
- maria_apply_log(from_lsn, TRUE, trace_file);
- end_pagecache(maria_pagecache, TRUE);
+ DBUG_ASSERT(maria_pagecache->inited);
+ res= maria_apply_log(LSN_IMPOSSIBLE, TRUE, trace_file, TRUE);
if (!res)
fprintf(trace_file, "SUCCESS\n");
fclose(trace_file);
}
- /**
- @todo take checkpoint if log applying did some work.
- Be sure to not checkpoint if no work.
- */
maria_in_recovery= FALSE;
DBUG_RETURN(res);
}
@@ -151,7 +140,8 @@ int maria_recover()
/**
@brief Displays and/or applies the log
- @param lsn LSN from which log reading/applying should start
+ @param from_lsn LSN from which log reading/applying should start;
+ LSN_IMPOSSIBLE means "use last checkpoint"
@param apply if log records should be applied or not
@param trace_file trace file where progress/debug messages will go
@@ -164,190 +154,81 @@ int maria_recover()
@retval !=0 Error
*/
-int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file)
+int maria_apply_log(LSN from_lsn, my_bool apply, FILE *trace_file,
+ my_bool should_run_undo_phase)
{
int error= 0;
DBUG_ENTER("maria_apply_log");
+ DBUG_ASSERT(apply || !should_run_undo_phase);
DBUG_ASSERT(!maria_multi_threaded);
- all_active_trans= (struct TRN_FOR_RECOVERY *)
- my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct TRN_FOR_RECOVERY),
+ all_active_trans= (struct st_trn_for_recovery *)
+ my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
+ MYF(MY_ZEROFILL));
+ all_tables= (struct st_table_for_recovery *)
+ my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
MYF(MY_ZEROFILL));
- all_tables= (MARIA_HA **)my_malloc((SHARE_ID_MAX + 1) * sizeof(MARIA_HA *),
- MYF(MY_ZEROFILL));
if (!all_active_trans || !all_tables)
goto err;
tracef= trace_file;
- /* install hooks for execution */
-#define install_exec_hook(R) \
- log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
- exec_LOGREC_ ## R;
- install_exec_hook(LONG_TRANSACTION_ID);
-#ifdef MARIA_CHECKPOINT
- install_exec_hook(CHECKPOINT);
-#endif
- install_exec_hook(REDO_CREATE_TABLE);
- install_exec_hook(REDO_DROP_TABLE);
- install_exec_hook(FILE_ID);
- install_exec_hook(REDO_INSERT_ROW_HEAD);
- install_exec_hook(REDO_INSERT_ROW_TAIL);
- install_exec_hook(REDO_PURGE_ROW_HEAD);
- install_exec_hook(REDO_PURGE_ROW_TAIL);
- install_exec_hook(REDO_PURGE_BLOCKS);
- install_exec_hook(REDO_DELETE_ALL);
- install_exec_hook(UNDO_ROW_INSERT);
- install_exec_hook(UNDO_ROW_DELETE);
- install_exec_hook(UNDO_ROW_UPDATE);
- install_exec_hook(UNDO_ROW_PURGE);
- install_exec_hook(COMMIT);
- current_group_end_lsn= LSN_IMPOSSIBLE;
-
- TRANSLOG_HEADER_BUFFER rec;
- struct st_translog_scanner_data scanner;
- uint i= 1;
-
- int len= translog_read_record_header(lsn, &rec);
-
- /** @todo EOF should be detected */
- if (len == RECHEADER_READ_ERROR)
+ if (from_lsn == LSN_IMPOSSIBLE)
{
- fprintf(tracef, "Cannot find a first record\n");
- goto err;
+ if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
+ from_lsn= first_lsn_in_log();
+ else
+ {
+ DBUG_ASSERT(0); /* not yet implemented */
+ from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
+ if (from_lsn == LSN_IMPOSSIBLE)
+ goto err;
+ }
}
- if (translog_init_scanner(lsn, 1, &scanner))
- {
- fprintf(tracef, "Scanner init failed\n");
+ if (run_redo_phase(from_lsn, apply))
goto err;
- }
- for (;;i++)
- {
- uint16 sid= rec.short_trid;
- const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
- display_record_position(log_desc, &rec, i);
- /*
- A complete group is a set of log records with an "end mark" record
- (e.g. a set of REDOs for an operation, terminated by an UNDO for this
- operation); if there is no "end mark" record the group is incomplete
- and won't be executed.
- */
- if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
- (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
- {
- if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
- {
- if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
- {
- /*
- can happen if the transaction got a table write error, then
- unlocked tables thus wrote a COMMIT record.
- */
- fprintf(tracef, "\nDiscarding unfinished group before this record\n");
- ALERT_USER();
- all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
- }
- else
- {
- /*
- There is a complete group for this transaction, containing more
- than this event.
- */
- fprintf(tracef, " ends a group:\n");
- struct st_translog_scanner_data scanner2;
- TRANSLOG_HEADER_BUFFER rec2;
- len=
- translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
- if (len < 0) /* EOF or error */
- {
- fprintf(tracef, "Cannot find record where it should be\n");
- goto err;
- }
- if (translog_init_scanner(rec2.lsn, 1, &scanner2))
- {
- fprintf(tracef, "Scanner2 init failed\n");
- goto err;
- }
- current_group_end_lsn= rec.lsn;
- do
- {
- if (rec2.short_trid == sid) /* it's in our group */
- {
- const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
- display_record_position(log_desc2, &rec2, 0);
- if (apply && display_and_apply_record(log_desc2, &rec2))
- goto err;
- }
- len= translog_read_next_record_header(&scanner2, &rec2);
- if (len < 0) /* EOF or error */
- {
- fprintf(tracef, "Cannot find record where it should be\n");
- goto err;
- }
- }
- while (rec2.lsn < rec.lsn);
- translog_free_record_header(&rec2);
- /* group finished */
- all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
- current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
- display_record_position(log_desc, &rec, 0);
- }
- }
- if (apply && display_and_apply_record(log_desc, &rec))
- goto err;
- }
- else /* record does not end group */
- {
- /* just record the fact, can't know if can execute yet */
- if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
- {
- /* group not yet started */
- all_active_trans[sid].group_start_lsn= rec.lsn;
- }
- }
- len= translog_read_next_record_header(&scanner, &rec);
- if (len < 0)
- {
- switch (len)
- {
- case RECHEADER_READ_EOF:
- fprintf(tracef, "EOF on the log\n");
- break;
- case RECHEADER_READ_ERROR:
- fprintf(stderr, "Error reading log\n");
- goto err;
- }
- break;
- }
+ uint unfinished_trans= end_of_redo_phase(should_run_undo_phase);
+ if (unfinished_trans == (uint)-1)
+ goto err;
+ if (should_run_undo_phase)
+ {
+ if (run_undo_phase(unfinished_trans))
+ return 1;
}
- translog_free_record_header(&rec);
+ else if (unfinished_trans > 0)
+ fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
+ " left inconsistent!\n", unfinished_trans);
/*
- So we have applied all REDOs.
- We may now have unfinished transactions.
- I don't think it's this program's job to roll them back:
- to roll back and at the same time stay idempotent, it needs to write log
- records (without CLRs, 2nd rollback would hit the effects of first
- rollback and fail). But this standalone tool is not allowed to write to
- the server's transaction log. So we do not roll back anything.
- In the real Recovery code, or the code to do "recover after online
- backup", yes we will roll back.
+ we don't use maria_panic() because it would maria_end(), and Recovery does
+ not want that (we want to keep modules initialized for runtime).
*/
- if (end_of_redo_phase())
+ if (close_all_tables())
goto err;
+ /*
+ At this stage, end of recovery, trnman is left initialized. This is for
+ the future, when we have an online UNDO phase or prepared transactions.
+ */
goto end;
err:
error= 1;
fprintf(tracef, "Recovery of tables with transaction logs FAILED\n");
end:
+ hash_free(&all_dirty_pages);
+ bzero(&all_dirty_pages, sizeof(all_dirty_pages));
+ my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
+ dirty_pages_pool= NULL;
my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
+ all_tables= NULL;
my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
+ all_active_trans= NULL;
my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
log_record_buffer.str= NULL;
log_record_buffer.length= 0;
+ /* we don't cleanly close tables if we hit some error (may corrupt them) */
DBUG_RETURN(error);
}
@@ -362,9 +243,8 @@ static void display_record_position(const LOG_DESC *log_desc,
form a group, so we indent below the group's end record
*/
fprintf(tracef, "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
- number ? "" : " ", number,
- (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn),
- rec->short_trid, log_desc->name, rec->type,
+ number ? "" : " ", number, LSN_IN_HEX(rec->lsn),
+ rec->short_trid, log_desc->name, rec->type,
(ulong)rec->record_length);
}
@@ -391,11 +271,10 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
TrID long_trid= all_active_trans[sid].long_trid;
/* abort group of this trn (must be of before a crash) */
LSN gslsn= all_active_trans[sid].group_start_lsn;
- char llbuf[22];
if (gslsn != LSN_IMPOSSIBLE)
{
fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
- (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
+ LSN_IN_HEX(gslsn), sid);
all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
}
if (long_trid != 0)
@@ -403,18 +282,17 @@ prototype_exec_hook(LONG_TRANSACTION_ID)
LSN ulsn= all_active_trans[sid].undo_lsn;
if (ulsn != LSN_IMPOSSIBLE)
{
+ char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Found an old transaction long_trid %s short_trid %u"
" with same short id as this new transaction, and has neither"
" committed nor rollback (undo_lsn: (%lu,0x%lx))\n", llbuf,
- sid, (ulong) LSN_FILE_NO(ulsn), (ulong) LSN_OFFSET(ulsn));
+ sid, LSN_IN_HEX(ulsn));
goto err;
}
}
long_trid= uint6korr(rec->header);
- all_active_trans[sid].long_trid= long_trid;
- llstr(long_trid, llbuf);
- fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n", llbuf, sid);
+ new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
goto end;
err:
ALERT_USER();
@@ -424,13 +302,24 @@ end:
}
-#ifdef MARIA_CHECKPOINT
-prototype_exec_hook(CHECKPOINT)
+static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
+ LSN first_undo_lsn)
+{
+ char llbuf[22];
+ all_active_trans[sid].long_trid= long_id;
+ llstr(long_id, llbuf);
+ fprintf(tracef, "Transaction long_trid %s short_trid %u starts\n",
+ llbuf, sid);
+ all_active_trans[sid].undo_lsn= undo_lsn;
+ all_active_trans[sid].first_undo_lsn= first_undo_lsn;
+}
+
+
+prototype_exec_hook_dummy(CHECKPOINT)
{
/* the only checkpoint we care about was found via control file, ignore */
return 0;
}
-#endif
prototype_exec_hook(REDO_CREATE_TABLE)
@@ -475,9 +364,9 @@ prototype_exec_hook(REDO_CREATE_TABLE)
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
- fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
- (ulong) LSN_FILE_NO(rec->lsn),
- (ulong) LSN_OFFSET(rec->lsn));
+ fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
+ " record, ignoring",
+ LSN_IN_HEX(share->state.create_rename_lsn));
error= 0;
goto end;
}
@@ -490,7 +379,7 @@ prototype_exec_hook(REDO_CREATE_TABLE)
info= NULL;
}
/* if does not exist, is older, or its header is corrupted, overwrite it */
- // TODO symlinks
+ /** @todo symlinks */
ptr= name + strlen(name) + 1;
if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
fprintf(tracef, ", we will only touch index file");
@@ -592,9 +481,9 @@ prototype_exec_hook(REDO_DROP_TABLE)
}
if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{
- fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than record",
- (ulong) LSN_FILE_NO(rec->lsn),
- (ulong) LSN_OFFSET(rec->lsn));
+ fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
+ " record, ignoring",
+ LSN_IN_HEX(share->state.create_rename_lsn));
error= 0;
goto end;
}
@@ -633,9 +522,15 @@ prototype_exec_hook(FILE_ID)
{
uint16 sid;
int error= 1;
- char *name, *buff;
- MARIA_HA *info= NULL;
- MARIA_SHARE *share;
+ const char *name;
+ MARIA_HA *info;
+
+ if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
+ {
+ fprintf(tracef, "ignoring because before checkpoint\n");
+ return 0;
+ }
+
enlarge_buffer(rec);
if (log_record_buffer.str == NULL ||
translog_read_record(rec->lsn, 0, rec->record_length,
@@ -645,21 +540,40 @@ prototype_exec_hook(FILE_ID)
fprintf(tracef, "Failed to read record\n");
goto end;
}
- buff= log_record_buffer.str;
- sid= fileid_korr(buff);
- name= buff + FILEID_STORE_SIZE;
- info= all_tables[sid];
+ sid= fileid_korr(log_record_buffer.str);
+ info= all_tables[sid].info;
if (info != NULL)
{
- all_tables[sid]= NULL;
- if (close_recovered_table(info))
+ fprintf(tracef, " Closing table '%s'\n", info->s->open_file_name);
+ prepare_table_for_close(info, rec->lsn);
+ if (maria_close(info))
{
fprintf(tracef, "Failed to close table\n");
goto end;
}
+ all_tables[sid].info= NULL;
}
+ name= log_record_buffer.str + FILEID_STORE_SIZE;
+ if (new_table(sid, name, -1, -1, rec->lsn))
+ goto end;
+ error= 0;
+end:
+ return error;
+}
+
+
+static int new_table(uint16 sid, const char *name,
+ File org_kfile, File org_dfile, LSN lsn)
+{
+ /*
+ -1 (skip table): close table and return 0;
+ 1 (error): close table and return 1;
+ 0 (success): leave table open and return 0.
+ */
+ int error= 1;
+
fprintf(tracef, "Table '%s', id %u", name, sid);
- info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
+ MARIA_HA *info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
if (info == NULL)
{
fprintf(tracef, ", is absent (must have been dropped later?)"
@@ -677,7 +591,7 @@ prototype_exec_hook(FILE_ID)
execute them, we should not reject the crashed table here.
*/
}
- share= info->s;
+ MARIA_SHARE *share= info->s;
/* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
@@ -685,10 +599,17 @@ prototype_exec_hook(FILE_ID)
{
fprintf(tracef, ", is not transactional\n");
ALERT_USER();
- error= 0;
+ error= -1;
+ goto end;
+ }
+ if (cmp_translog_addr(lsn, share->state.create_rename_lsn) <= 0)
+ {
+ fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
+ " record, ignoring",
+ LSN_IN_HEX(share->state.create_rename_lsn));
+ error= -1;
goto end;
}
- all_tables[sid]= info;
/* don't log any records for this work */
_ma_tmp_disable_logging_for_table(share);
/* execution of some REDO records relies on data_file_length */
@@ -702,17 +623,25 @@ prototype_exec_hook(FILE_ID)
}
share->state.state.data_file_length= dfile_len;
share->state.state.key_file_length= kfile_len;
- if ((dfile_len == 0) || ((dfile_len % share->block_size) > 0))
+ if ((dfile_len % share->block_size) > 0)
{
fprintf(tracef, ", has too short last page\n");
/* Recovery will fix this, no error */
ALERT_USER();
}
+ all_tables[sid].info= info;
+ all_tables[sid].org_kfile= org_kfile;
+ all_tables[sid].org_dfile= org_dfile;
fprintf(tracef, ", opened\n");
error= 0;
end:
- if (error && info != NULL)
- error|= maria_close(info);
+ if (error)
+ {
+ if (info != NULL)
+ maria_close(info);
+ if (error == -1)
+ error= 0;
+ }
return error;
}
@@ -765,7 +694,7 @@ end:
prototype_exec_hook(REDO_INSERT_ROW_TAIL)
{
int error= 1;
- uchar *buff;
+ uchar *buff= NULL;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
@@ -833,24 +762,11 @@ end:
prototype_exec_hook(REDO_PURGE_BLOCKS)
{
int error= 1;
- uchar *buff;
MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
if (info == NULL)
goto end;
- enlarge_buffer(rec);
-
- if (log_record_buffer.str == NULL ||
- translog_read_record(rec->lsn, 0, rec->record_length,
- log_record_buffer.str, NULL) !=
- rec->record_length)
- {
- fprintf(tracef, "Failed to read record\n");
- goto end;
- }
-
- buff= log_record_buffer.str;
if (_ma_apply_redo_purge_blocks(info, current_group_end_lsn,
- buff + FILEID_STORE_SIZE))
+ rec->header + FILEID_STORE_SIZE))
goto end;
error= 0;
end:
@@ -874,17 +790,18 @@ end:
}
+#define set_undo_lsn_for_active_trans(I, L) do { \
+ all_active_trans[I].undo_lsn= L; \
+ if (all_active_trans[I].first_undo_lsn == LSN_IMPOSSIBLE) \
+ all_active_trans[I].first_undo_lsn= L; } while (0)
+
prototype_exec_hook(UNDO_ROW_INSERT)
{
int error= 1;
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
- all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
- /*
- todo: instead of above, call write_hook_for_undo, it will also set
- first_undo_lsn
- */
+ set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
/*
in an upcoming patch ("recovery of the state"), we introduce
state.is_of_lsn. For now, we just assume the state is old (true when we
@@ -893,6 +810,7 @@ prototype_exec_hook(UNDO_ROW_INSERT)
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records++;
+ /** @todo RECOVERY BUG Also update the table's checksum */
}
fprintf(tracef, " rows' count %lu\n", (ulong)info->s->state.state.records);
error= 0;
@@ -907,11 +825,7 @@ prototype_exec_hook(UNDO_ROW_DELETE)
MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
if (info == NULL)
goto end;
- all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
- /*
- todo: instead of above, call write_hook_for_undo, it will also set
- first_undo_lsn
- */
+ set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
@@ -923,23 +837,6 @@ end:
}
-prototype_exec_hook(UNDO_ROW_UPDATE)
-{
- int error= 1;
- MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
- if (info == NULL)
- goto end;
- all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
- /*
- todo: instead of above, call write_hook_for_undo, it will also set
- first_undo_lsn
- */
- error= 0;
-end:
- return error;
-}
-
-
prototype_exec_hook(UNDO_ROW_PURGE)
{
int error= 1;
@@ -947,11 +844,7 @@ prototype_exec_hook(UNDO_ROW_PURGE)
if (info == NULL)
goto end;
/* this a bit broken, but this log record type will be deleted soon */
- all_active_trans[rec->short_trid].undo_lsn= rec->lsn;
- /*
- todo: instead of above, call write_hook_for_undo, it will also set
- first_undo_lsn
- */
+ set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
{
fprintf(tracef, " state older than record, updating rows' count\n");
info->s->state.state.records--;
@@ -1002,77 +895,284 @@ prototype_exec_hook(COMMIT)
}
-/* Just to inform about any aborted groups or unfinished transactions */
-static int end_of_redo_phase()
+static int run_redo_phase(LSN lsn, my_bool apply)
+{
+ /* install hooks for execution */
+#define install_exec_hook(R) \
+ log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
+ exec_LOGREC_ ## R;
+ install_exec_hook(LONG_TRANSACTION_ID);
+ install_exec_hook(CHECKPOINT);
+ install_exec_hook(REDO_CREATE_TABLE);
+ install_exec_hook(REDO_DROP_TABLE);
+ install_exec_hook(FILE_ID);
+ install_exec_hook(REDO_INSERT_ROW_HEAD);
+ install_exec_hook(REDO_INSERT_ROW_TAIL);
+ install_exec_hook(REDO_PURGE_ROW_HEAD);
+ install_exec_hook(REDO_PURGE_ROW_TAIL);
+ install_exec_hook(REDO_PURGE_BLOCKS);
+ install_exec_hook(REDO_DELETE_ALL);
+ install_exec_hook(UNDO_ROW_INSERT);
+ install_exec_hook(UNDO_ROW_DELETE);
+ install_exec_hook(UNDO_ROW_PURGE);
+ install_exec_hook(COMMIT);
+
+ current_group_end_lsn= LSN_IMPOSSIBLE;
+
+ TRANSLOG_HEADER_BUFFER rec;
+ /*
+ instead of this block below we will soon use
+ translog_first_lsn_in_log()...
+ */
+ int len= translog_read_record_header(lsn, &rec);
+
+ /** @todo EOF should be detected */
+ if (len == RECHEADER_READ_ERROR)
+ {
+ fprintf(tracef, "Cannot find a first record\n");
+ return 1;
+ }
+ struct st_translog_scanner_data scanner;
+ if (translog_init_scanner(lsn, 1, &scanner))
+ {
+ fprintf(tracef, "Scanner init failed\n");
+ return 1;
+ }
+ uint i;
+ for (i= 1;;i++)
+ {
+ uint16 sid= rec.short_trid;
+ const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
+ display_record_position(log_desc, &rec, i);
+
+ /*
+ A complete group is a set of log records with an "end mark" record
+ (e.g. a set of REDOs for an operation, terminated by an UNDO for this
+ operation); if there is no "end mark" record the group is incomplete
+ and won't be executed.
+ */
+ if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
+ (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
+ {
+ if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
+ {
+ if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
+ {
+ /*
+ can happen if the transaction got a table write error, then
+ unlocked tables thus wrote a COMMIT record.
+ */
+ fprintf(tracef, "\nDiscarding unfinished group before this record\n");
+ ALERT_USER();
+ all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
+ }
+ else
+ {
+ /*
+ There is a complete group for this transaction, containing more
+ than this event.
+ */
+ fprintf(tracef, " ends a group:\n");
+ struct st_translog_scanner_data scanner2;
+ TRANSLOG_HEADER_BUFFER rec2;
+ len=
+ translog_read_record_header(all_active_trans[sid].group_start_lsn, &rec2);
+ if (len < 0) /* EOF or error */
+ {
+ fprintf(tracef, "Cannot find record where it should be\n");
+ return 1;
+ }
+ if (translog_init_scanner(rec2.lsn, 1, &scanner2))
+ {
+ fprintf(tracef, "Scanner2 init failed\n");
+ return 1;
+ }
+ current_group_end_lsn= rec.lsn;
+ do
+ {
+ if (rec2.short_trid == sid) /* it's in our group */
+ {
+ const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
+ display_record_position(log_desc2, &rec2, 0);
+ if (apply && display_and_apply_record(log_desc2, &rec2))
+ return 1;
+ }
+ len= translog_read_next_record_header(&scanner2, &rec2);
+ if (len < 0) /* EOF or error */
+ {
+ fprintf(tracef, "Cannot find record where it should be\n");
+ return 1;
+ }
+ }
+ while (rec2.lsn < rec.lsn);
+ translog_free_record_header(&rec2);
+ /* group finished */
+ all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
+ current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
+ display_record_position(log_desc, &rec, 0);
+ }
+ }
+ if (apply && display_and_apply_record(log_desc, &rec))
+ return 1;
+ }
+ else /* record does not end group */
+ {
+ /* just record the fact, can't know if can execute yet */
+ if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
+ {
+ /* group not yet started */
+ all_active_trans[sid].group_start_lsn= rec.lsn;
+ }
+ }
+ len= translog_read_next_record_header(&scanner, &rec);
+ if (len < 0)
+ {
+ switch (len)
+ {
+ case RECHEADER_READ_EOF:
+ fprintf(tracef, "EOF on the log\n");
+ break;
+ case RECHEADER_READ_ERROR:
+ fprintf(stderr, "Error reading log\n");
+ return 1;
+ }
+ break;
+ }
+ }
+ translog_free_record_header(&rec);
+ return 0;
+}
+
+
+/**
+ @brief Informs about any aborted groups or unfinished transactions,
+ prepares for the UNDO phase if needed.
+
+ @param prepare_for_undo_phase
+
+ @note Observe that it may init trnman.
+*/
+static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
{
- uint sid, unfinished= 0, error= 0;
+ uint sid, unfinished= 0;
+
+ hash_free(&all_dirty_pages);
+ /*
+ hash_free() can be called multiple times probably, but be safe it that
+ changes
+ */
+ bzero(&all_dirty_pages, sizeof(all_dirty_pages));
+ my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
+ dirty_pages_pool= NULL;
+
+ if (prepare_for_undo_phase && trnman_init())
+ return -1;
+
for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
{
TrID long_trid= all_active_trans[sid].long_trid;
LSN gslsn= all_active_trans[sid].group_start_lsn;
+ TRN *trn;
+ if (gslsn != LSN_IMPOSSIBLE)
+ {
+ fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
+ (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
+ ALERT_USER();
+ }
if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
{
char llbuf[22];
llstr(long_trid, llbuf);
fprintf(tracef, "Transaction long_trid %s short_trid %u unfinished\n",
llbuf, sid);
+ /* dummy_transaction_object serves only for DDLs */
+ DBUG_ASSERT(long_trid != 0);
+ if (prepare_for_undo_phase)
+ {
+ if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
+ return -1;
+ trn->undo_lsn= all_active_trans[sid].undo_lsn;
+ }
+ /* otherwise we will just warn about it */
unfinished++;
}
- if (gslsn != LSN_IMPOSSIBLE)
- {
- fprintf(tracef, "Group at LSN (%lu,0x%lx) short_trid %u aborted\n",
- (ulong) LSN_FILE_NO(gslsn), (ulong) LSN_OFFSET(gslsn), sid);
- ALERT_USER();
- }
- /* If real recovery: roll back unfinished transaction */
#ifdef MARIA_VERSIONING
/*
- If real recovery: transaction was committed, move it to some separate
- list for soon purging. Create TRNs.
+ If real recovery: if transaction was committed, move it to some separate
+ list for soon purging.
*/
#endif
}
- /*
- We don't close tables if there are some unfinished transactions, because
- closing tables normally requires that all unfinished transactions on them
- be rolled back. Unfinished transactions are symptom of a crash, we
- reproduce the crash.
- For example, closing will soon write the state to disk and when doing that
- it will think this is a committed state, but it may not be.
+
+ my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
+ all_active_trans= NULL;
+
+ /*
+ The UNDO phase uses some normal run-time code of ROLLBACK: generates log
+ records, etc; prepare tables for that
*/
- if (unfinished > 0)
- fprintf(tracef, "WARNING: %u unfinished transactions; some tables may be"
- " left inconsistent!\n", unfinished);
+ LSN addr= translog_get_horizon();
for (sid= 0; sid <= SHARE_ID_MAX; sid++)
{
- MARIA_HA *info= all_tables[sid];
+ MARIA_HA *info= all_tables[sid].info;
if (info != NULL)
{
- /* if error, still close other tables */
- error|= close_recovered_table(info);
+ prepare_table_for_close(info, addr);
+ /*
+ But we don't close it; we leave it available for the UNDO phase;
+ it's likely that the UNDO phase will need it.
+ */
+ if (prepare_for_undo_phase)
+ translog_assign_id_to_share_from_recovery(info->s, sid);
}
}
- return error;
+
+ /* we don't need all_tables anymore, maria_open_list is enough */
+ my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
+ all_tables= NULL;
+
+ /*
+ We could take a checkpoint here, in case of a crash during the UNDO
+ phase. The drawback is that a page which got a REDO (thus, flushed
+ by this would-be checkpoint) is likely to have an UNDO executed on it
+ soon. And so, the flush was probably lost time.
+ So for now we prefer to do recovery with maximum speed and take a
+ checkpoint only at the end of the UNDO phase.
+ */
+
+ return unfinished;
}
-static int close_recovered_table(MARIA_HA *info)
+static int run_undo_phase(uint unfinished)
+{
+ if (unfinished > 0)
+ {
+ fprintf(tracef, "%u transactions will be rolled back\n", unfinished);
+ for( ; unfinished-- ; )
+ {
+ char llbuf[22];
+ TRN *trn= trnman_get_any_trn();
+ DBUG_ASSERT(trn != NULL);
+ llstr(trn->trid, llbuf);
+ fprintf(tracef, "Rolling back transaction of long id %s\n", llbuf);
+ /* of course we miss execution of UNDOs here */
+ if (trnman_rollback_trn(trn))
+ return 1;
+ /* We could want to span a few threads (4?) instead of 1 */
+ /* In the future, we want to have this phase *online* */
+ }
+ }
+ return 0;
+}
+
+
+static void prepare_table_for_close(MARIA_HA *info,
+ LSN at_lsn __attribute__ ((unused)))
{
- int error;
MARIA_SHARE *share= info->s;
- fprintf(tracef, " Closing table '%s'\n", share->open_file_name);
+ /* we will soon use at_lsn here */
_ma_reenable_logging_for_table(share);
- /*
- Recovery normally corrected problems, don't scare user with "table was not
- closed properly" in CHECK TABLE and don't automatically check table at
- next open (when we have --maria-recover).
- */
- share->state.open_count= share->global_changed ? 1 : 0;
- /* this var is set only by non-recovery operations (mi_write() etc) */
- DBUG_ASSERT(!share->global_changed);
- if ((error= maria_close(info)))
- fprintf(tracef, "Failed to close table\n");
- return error;
}
@@ -1080,16 +1180,22 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
TRANSLOG_HEADER_BUFFER *rec)
{
uint16 sid;
- ulonglong page;
+ pgcache_page_no_t page;
MARIA_HA *info;
char llbuf[22];
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
- /* BUG not correct for REDO_PURGE_BLOCKS, page is not at this pos */
+ /**
+ @todo RECOVERY BUG
+ - for REDO_PURGE_BLOCKS, page is not at this pos
+ - for DELETE_ALL, record ends here! buffer overrun!
+ Solution: caller should pass a param enum { i_am_about_data_file,
+ i_am_about_index_file, none }.
+ */
llstr(page, llbuf);
fprintf(tracef, " For page %s of table of short id %u", llbuf, sid);
- info= all_tables[sid];
+ info= all_tables[sid].info;
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
@@ -1098,23 +1204,38 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
fprintf(tracef, ", '%s'", info->s->open_file_name);
/* detect if an open instance of a dropped table (internal bug) */
DBUG_ASSERT(info->s->last_version != 0);
- if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
+ if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
- fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
- " record\n",
- (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
- return NULL;
+ /**
+ @todo RECOVERY BUG always assuming this is REDO for data file, but it
+ could soon be index file
+ */
+ uint64 file_and_page_id=
+ (((uint64)all_tables[sid].org_dfile) << 32) | page;
+ struct st_dirty_page *dirty_page= (struct st_dirty_page *)
+ hash_search(&all_dirty_pages,
+ (uchar *)&file_and_page_id, sizeof(file_and_page_id));
+ if ((dirty_page == NULL) ||
+ cmp_translog_addr(rec->lsn, dirty_page->rec_lsn) < 0)
+ {
+ fprintf(tracef, ", ignoring because of dirty_pages list\n");
+ return NULL;
+ }
}
- fprintf(tracef, ", applying record\n");
- return info;
+
/*
- Soon we will also skip the page depending on the rec_lsn for this page in
- the checkpoint record, but this is not absolutely needed for now (just
- assume we have made no checkpoint). Btw rec_lsn and bitmap's recovery is a
- an unsolved problem (rec_lsn is to ignore a REDO without reading the data
- page and to do so we need to be sure the corresponding bitmap page does
- not need a _ma_bitmap_set()).
+ So we are going to read the page, and if its LSN is older than the
+ record's we will modify the page
*/
+ fprintf(tracef, ", applying record\n");
+ /* A future CHECK/OPTIMIZE/REPAIR should not be fooled: */
+ /**
+ @todo but the ones about keys should be set only if REDO for keys. Same
+ in ..._from_UNDO_record
+ */
+ info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
+ STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
+ return info;
}
@@ -1126,7 +1247,7 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
sid= fileid_korr(rec->header + LSN_STORE_SIZE);
fprintf(tracef, " For table of short id %u", sid);
- info= all_tables[sid];
+ info= all_tables[sid].info;
if (info == NULL)
{
fprintf(tracef, ", table skipped, so skipping record\n");
@@ -1134,24 +1255,180 @@ static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
}
fprintf(tracef, ", '%s'", info->s->open_file_name);
DBUG_ASSERT(info->s->last_version != 0);
- if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0)
- {
- fprintf(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than log"
- " record\n",
- (ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_OFFSET(rec->lsn));
- return NULL;
- }
fprintf(tracef, ", applying record\n");
+ /* execution of UNDOs may increment the records' count: */
+ info->s->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED |
+ STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES;
return info;
+}
+
+
+static int parse_checkpoint_record(LSN lsn)
+{
+ uint i;
+ TRANSLOG_HEADER_BUFFER rec;
+
+ fprintf(tracef, "Loading data from checkpoint record\n");
+ int len= translog_read_record_header(lsn, &rec);
+
+ /** @todo EOF should be detected */
+ if (len == RECHEADER_READ_ERROR)
+ {
+ fprintf(tracef, "Cannot find checkpoint record where it should be\n");
+ return 1;
+ }
+
+ enlarge_buffer(&rec);
+ if (log_record_buffer.str == NULL ||
+ translog_read_record(rec.lsn, 0, rec.record_length,
+ log_record_buffer.str, NULL) !=
+ rec.record_length)
+ {
+ fprintf(tracef, "Failed to read record\n");
+ return 1;
+ }
+
+ char *ptr= log_record_buffer.str;
+ checkpoint_start= lsn_korr(ptr);
+ ptr+= LSN_STORE_SIZE;
+
+ /* transactions */
+ uint nb_active_transactions= uint2korr(ptr);
+ ptr+= 2;
+ fprintf(tracef, "%u active transactions\n", nb_active_transactions);
+ LSN minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
+ ptr+= LSN_STORE_SIZE;
+
/*
- Soon we will also skip the page depending on the rec_lsn for this page in
- the checkpoint record, but this is not absolutely needed for now (just
- assume we have made no checkpoint).
+ how much brain juice and discussions there was to come to writing this
+ line
*/
+ set_if_smaller(checkpoint_start, minimum_rec_lsn_of_active_transactions);
+
+ for (i= 0; i < nb_active_transactions; i++)
+ {
+ uint16 sid= uint2korr(ptr);
+ ptr+= 2;
+ TrID long_id= uint6korr(ptr);
+ ptr+= 6;
+ DBUG_ASSERT(sid > 0 && long_id > 0);
+ LSN undo_lsn= lsn_korr(ptr);
+ ptr+= LSN_STORE_SIZE;
+ LSN first_undo_lsn= lsn_korr(ptr);
+ ptr+= LSN_STORE_SIZE;
+ new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
+ }
+ uint nb_committed_transactions= uint4korr(ptr);
+ ptr+= 4;
+ fprintf(tracef, "%lu committed transactions\n",
+ (ulong)nb_committed_transactions);
+ /* no purging => committed transactions are not important */
+ ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
+
+ /* tables */
+ uint nb_tables= uint4korr(ptr);
+ fprintf(tracef, "%u open tables\n", nb_tables);
+ for (i= 0; i< nb_tables; i++)
+ {
+ char name[FN_REFLEN];
+ uint16 sid= uint2korr(ptr);
+ ptr+= 2;
+ DBUG_ASSERT(sid > 0);
+ File kfile= uint4korr(ptr);
+ ptr+= 4;
+ File dfile= uint4korr(ptr);
+ ptr+= 4;
+ LSN first_log_write_lsn= lsn_korr(ptr);
+ ptr+= LSN_STORE_SIZE;
+ uint name_len= strlen(ptr) + 1;
+ ptr+= name_len;
+ strnmov(name, ptr, sizeof(name));
+ if (new_table(sid, name, kfile, dfile, first_log_write_lsn))
+ return 1;
+ }
+
+ /* dirty pages */
+ uint nb_dirty_pages= uint4korr(ptr);
+ ptr+= 4;
+ if (hash_init(&all_dirty_pages, &my_charset_bin, nb_dirty_pages,
+ offsetof(struct st_dirty_page, file_and_page_id),
+ sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
+ NULL, NULL, 0))
+ return 1;
+ dirty_pages_pool=
+ (struct st_dirty_page *)my_malloc(nb_dirty_pages *
+ sizeof(struct st_dirty_page),
+ MYF(MY_WME));
+ if (unlikely(dirty_pages_pool == NULL))
+ return 1;
+ struct st_dirty_page *next_dirty_page_in_pool= dirty_pages_pool;
+ LSN minimum_rec_lsn_of_dirty_pages= LSN_MAX;
+ for (i= 0; i < nb_dirty_pages ; i++)
+ {
+ File fileid= uint4korr(ptr);
+ ptr+= 4;
+ pgcache_page_no_t pageid= uint4korr(ptr);
+ ptr+= 4;
+ LSN rec_lsn= lsn_korr(ptr);
+ ptr+= LSN_STORE_SIZE;
+ if (new_page(fileid, pageid, rec_lsn, next_dirty_page_in_pool++))
+ return 1;
+ set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
+ }
+ /* after that, there will be no insert/delete into the hash */
+ /*
+ sanity check on record (did we screw up with all those "ptr+=", did the
+ checkpoint write code and checkpoint read code go out of sync?).
+ */
+ /**
+ @todo This probably presently and hopefully detects that
+ first_log_write_lsn is not written by the checkpoint record; we need
+ to add MARIA_SHARE::first_log_write_lsn, fill it with a inwrite-hook of
+ LOGREC_FILE_ID (note that when we write this record we hold intern_lock,
+ so Checkpoint will read the LSN correctly), and store it in the
+ checkpoint record.
+ */
+ if (ptr != (log_record_buffer.str + log_record_buffer.length))
+ {
+ fprintf(tracef, "checkpoint record corrupted\n");
+ return 1;
+ }
+ set_if_smaller(checkpoint_start, minimum_rec_lsn_of_dirty_pages);
+
+ return 0;
}
+static int new_page(File fileid, pgcache_page_no_t pageid, LSN rec_lsn,
+ struct st_dirty_page *dirty_page)
+{
+ /* serves as hash key */
+ dirty_page->file_and_page_id= (((uint64)fileid) << 32) | pageid;
+ dirty_page->rec_lsn= rec_lsn;
+ return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
+}
+static int close_all_tables()
+{
+ int error= 0;
+ LIST *list_element, *next_open;
+ MARIA_HA *info;
+ pthread_mutex_lock(&THR_LOCK_maria);
+ if (maria_open_list == NULL)
+ goto end;
+ fprintf(tracef, "Closing all tables\n");
+ for (list_element= maria_open_list ; list_element ; list_element= next_open)
+ {
+ next_open= list_element->next;
+ info= (MARIA_HA*)list_element->data;
+ pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
+ error|= maria_close(info);
+ pthread_mutex_lock(&THR_LOCK_maria);
+ }
+end:
+ pthread_mutex_unlock(&THR_LOCK_maria);
+ return error;
+}
/* some comments and pseudo-code which we keep for later */
#if 0
diff --git a/storage/maria/ma_recovery.h b/storage/maria/ma_recovery.h
index 0b576efc95f..9a5a2b3099e 100644
--- a/storage/maria/ma_recovery.h
+++ b/storage/maria/ma_recovery.h
@@ -25,5 +25,6 @@
C_MODE_START
int maria_recover();
-int maria_apply_log(LSN lsn, my_bool applyn, FILE *trace_file);
+int maria_apply_log(LSN lsn, my_bool apply, FILE *trace_file,
+ my_bool execute_undo_phase);
C_MODE_END
diff --git a/storage/maria/maria_chk.c b/storage/maria/maria_chk.c
index 43da7698f87..bea9e487314 100644
--- a/storage/maria/maria_chk.c
+++ b/storage/maria/maria_chk.c
@@ -1035,7 +1035,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
that it will have to find and store it.
*/
if (share->base.born_transactional)
- share->state.create_rename_lsn= (LSN)ULONGLONG_MAX;
+ share->state.create_rename_lsn= LSN_REPAIRED_BY_MARIA_CHK;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) ||
(rep_quick && !param->keys_in_use && !recreate)) &&
diff --git a/storage/maria/maria_read_log.c b/storage/maria/maria_read_log.c
index b6bcb2040d6..e487847b486 100644
--- a/storage/maria/maria_read_log.c
+++ b/storage/maria/maria_read_log.c
@@ -51,7 +51,7 @@ int main(int argc, char **argv)
goto err;
}
/* we don't want to create a control file, it MUST exist */
- if (ma_control_file_create_or_open(FALSE))
+ if (ma_control_file_create_or_open())
{
fprintf(stderr, "Can't open control file (%d)\n", errno);
goto err;
@@ -93,7 +93,8 @@ int main(int argc, char **argv)
*/
fprintf(stdout, "TRACE of the last maria_read_log\n");
- if (maria_apply_log(lsn, opt_display_and_apply, stdout))
+ /* Until we have UNDO records, no UNDO phase */
+ if (maria_apply_log(lsn, opt_display_and_apply, stdout, FALSE))
goto err;
fprintf(stdout, "%s: SUCCESS\n", my_progname);
diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c
index 177ee2a7a70..b0550085863 100644
--- a/storage/maria/trnman.c
+++ b/storage/maria/trnman.c
@@ -18,6 +18,7 @@
#include <my_sys.h>
#include <m_string.h>
#include "trnman.h"
+#include "ma_control_file.h"
/*
status variables:
@@ -708,3 +709,29 @@ end:
pthread_mutex_unlock(&LOCK_trn_list);
DBUG_RETURN(error);
}
+
+
+TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid)
+{
+ TrID old_trid_generator= global_trid_generator;
+ TRN *trn;
+ DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded);
+ if (unlikely((trn= trnman_new_trn(NULL, NULL, NULL)) == NULL))
+ return NULL;
+ /* deallocate excessive allocations of trnman_new_trn() */
+ global_trid_generator= old_trid_generator;
+ set_if_bigger(global_trid_generator, longid);
+ short_trid_to_active_trn[trn->short_id]= 0;
+ DBUG_ASSERT(short_trid_to_active_trn[shortid] == NULL);
+ short_trid_to_active_trn[shortid]= trn;
+ trn->trid= longid;
+ trn->short_id= shortid;
+ return trn;
+}
+
+
+TRN *trnman_get_any_trn()
+{
+ TRN *trn= active_list_min.next;
+ return (trn != &active_list_max) ? trn : NULL;
+}
diff --git a/storage/maria/trnman_public.h b/storage/maria/trnman_public.h
index e1891466c4d..10dcb479530 100644
--- a/storage/maria/trnman_public.h
+++ b/storage/maria/trnman_public.h
@@ -53,6 +53,8 @@ uint trnman_increment_locked_tables(TRN *trn);
uint trnman_decrement_locked_tables(TRN *trn);
my_bool trnman_has_locked_tables(TRN *trn);
void trnman_reset_locked_tables(TRN *trn);
+TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid);
+TRN *trnman_get_any_trn();
C_MODE_END
#endif