diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/logger/recover.cc')
-rw-r--r-- | storage/tokudb/PerconaFT/ft/logger/recover.cc | 1657 |
1 files changed, 1657 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/logger/recover.cc b/storage/tokudb/PerconaFT/ft/logger/recover.cc new file mode 100644 index 00000000000..38f29773bd6 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/logger/recover.cc @@ -0,0 +1,1657 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "ft/cachetable/cachetable.h" +#include "ft/cachetable/checkpoint.h" +#include "ft/ft.h" +#include "ft/log_header.h" +#include "ft/logger/log-internal.h" +#include "ft/logger/logcursor.h" +#include "ft/txn/txn_manager.h" +#include "util/omt.h" + +int tokuft_recovery_trace = 0; // turn on recovery tracing, default off. + +//#define DO_VERIFY_COUNTS +#ifdef DO_VERIFY_COUNTS +#define VERIFY_COUNTS(n) toku_verify_or_set_counts(n, false) +#else +#define VERIFY_COUNTS(n) ((void)0) +#endif + +// time in seconds between recovery progress reports +#define TOKUFT_RECOVERY_PROGRESS_TIME 15 +time_t tokuft_recovery_progress_time = TOKUFT_RECOVERY_PROGRESS_TIME; + +enum ss { + BACKWARD_NEWER_CHECKPOINT_END = 1, + BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END, + FORWARD_BETWEEN_CHECKPOINT_BEGIN_END, + FORWARD_NEWER_CHECKPOINT_END, +}; + +struct scan_state { + enum ss ss; + LSN checkpoint_begin_lsn; + LSN checkpoint_end_lsn; + uint64_t checkpoint_end_timestamp; + uint64_t checkpoint_begin_timestamp; + uint32_t checkpoint_num_fassociate; + uint32_t checkpoint_num_xstillopen; + TXNID last_xid; +}; + +static const char *scan_state_strings[] = { + "?", "bw_newer", "bw_between", "fw_between", "fw_newer", +}; + +static void scan_state_init(struct scan_state *ss) { + ss->ss = BACKWARD_NEWER_CHECKPOINT_END; + ss->checkpoint_begin_lsn = ZERO_LSN; + ss->checkpoint_end_lsn = ZERO_LSN; + ss->checkpoint_num_fassociate = 0; + ss->checkpoint_num_xstillopen = 0; + ss->last_xid = 0; +} + +static const char *scan_state_string(struct scan_state *ss) { + assert(BACKWARD_NEWER_CHECKPOINT_END <= ss->ss && ss->ss <= FORWARD_NEWER_CHECKPOINT_END); + return scan_state_strings[ss->ss]; +} + +// File map tuple +struct file_map_tuple { + FILENUM filenum; + FT_HANDLE ft_handle; // NULL ft_handle means it's a rollback file. + char *iname; + struct __toku_db fake_db; +}; + +static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, FT_HANDLE ft_handle, char *iname) { + tuple->filenum = filenum; + tuple->ft_handle = ft_handle; + tuple->iname = iname; + // use a fake DB for comparisons, using the ft's cmp descriptor + memset(&tuple->fake_db, 0, sizeof(tuple->fake_db)); + tuple->fake_db.cmp_descriptor = &tuple->ft_handle->ft->cmp_descriptor; + tuple->fake_db.descriptor = &tuple->ft_handle->ft->descriptor; +} + +static void file_map_tuple_destroy(struct file_map_tuple *tuple) { + if (tuple->iname) { + toku_free(tuple->iname); + tuple->iname = NULL; + } +} + +// Map filenum to ft_handle +struct file_map { + toku::omt<struct file_map_tuple *> *filenums; +}; + +// The recovery environment +struct recover_env { + DB_ENV *env; + prepared_txn_callback_t prepared_txn_callback; // at the end of recovery, all the prepared txns are passed back to the ydb layer to make them into valid transactions. + keep_cachetable_callback_t keep_cachetable_callback; // after recovery, store the cachetable into the environment. + CACHETABLE ct; + TOKULOGGER logger; + CHECKPOINTER cp; + ft_compare_func bt_compare; + ft_update_func update_function; + generate_row_for_put_func generate_row_for_put; + generate_row_for_del_func generate_row_for_del; + DBT_ARRAY dest_keys; + DBT_ARRAY dest_vals; + struct scan_state ss; + struct file_map fmap; + bool goforward; + bool destroy_logger_at_end; // If true then destroy the logger when we are done. If false then set the logger into write-files mode when we are done with recovery.*/ +}; +typedef struct recover_env *RECOVER_ENV; + + +static void file_map_init(struct file_map *fmap) { + XMALLOC(fmap->filenums); + fmap->filenums->create(); +} + +static void file_map_destroy(struct file_map *fmap) { + fmap->filenums->destroy(); + toku_free(fmap->filenums); + fmap->filenums = nullptr; +} + +static uint32_t file_map_get_num_dictionaries(struct file_map *fmap) { + return fmap->filenums->size(); +} + +static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) { + int r; + + while (1) { + uint32_t n = fmap->filenums->size(); + if (n == 0) { + break; + } + struct file_map_tuple *tuple; + r = fmap->filenums->fetch(n - 1, &tuple); + assert(r == 0); + r = fmap->filenums->delete_at(n - 1); + assert(r == 0); + assert(tuple->ft_handle); + // Logging is on again, but we must pass the right LSN into close. + if (tuple->ft_handle) { // it's a DB, not a rollback file + toku_ft_handle_close_recovery(tuple->ft_handle, oplsn); + } + file_map_tuple_destroy(tuple); + toku_free(tuple); + } +} + +static int file_map_h(struct file_map_tuple *const &a, const FILENUM &b) { + if (a->filenum.fileid < b.fileid) { + return -1; + } else if (a->filenum.fileid > b.fileid) { + return 1; + } else { + return 0; + } +} + +static int file_map_insert (struct file_map *fmap, FILENUM fnum, FT_HANDLE ft_handle, char *iname) { + struct file_map_tuple *XMALLOC(tuple); + file_map_tuple_init(tuple, fnum, ft_handle, iname); + int r = fmap->filenums->insert<FILENUM, file_map_h>(tuple, fnum, nullptr); + return r; +} + +static void file_map_remove(struct file_map *fmap, FILENUM fnum) { + uint32_t idx; + struct file_map_tuple *tuple; + int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx); + if (r == 0) { + r = fmap->filenums->delete_at(idx); + file_map_tuple_destroy(tuple); + toku_free(tuple); + } +} + +// Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND) +static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) { + uint32_t idx; + struct file_map_tuple *tuple; + int r = fmap->filenums->find_zero<FILENUM, file_map_h>(fnum, &tuple, &idx); + if (r == 0) { + assert(tuple->filenum.fileid == fnum.fileid); + *file_map_tuple = tuple; + } else { + assert(r == DB_NOTFOUND); + } + return r; +} + +static int recover_env_init (RECOVER_ENV renv, + const char *env_dir, + DB_ENV *env, + prepared_txn_callback_t prepared_txn_callback, + keep_cachetable_callback_t keep_cachetable_callback, + TOKULOGGER logger, + ft_compare_func bt_compare, + ft_update_func update_function, + generate_row_for_put_func generate_row_for_put, + generate_row_for_del_func generate_row_for_del, + size_t cachetable_size) { + int r = 0; + + // If we are passed a logger use it, otherwise create one. + renv->destroy_logger_at_end = logger==NULL; + if (logger) { + renv->logger = logger; + } else { + r = toku_logger_create(&renv->logger); + assert(r == 0); + } + toku_logger_write_log_files(renv->logger, false); + toku_cachetable_create(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, renv->logger); + toku_cachetable_set_env_dir(renv->ct, env_dir); + if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct); + toku_logger_set_cachetable(renv->logger, renv->ct); + renv->env = env; + renv->prepared_txn_callback = prepared_txn_callback; + renv->keep_cachetable_callback = keep_cachetable_callback; + renv->bt_compare = bt_compare; + renv->update_function = update_function; + renv->generate_row_for_put = generate_row_for_put; + renv->generate_row_for_del = generate_row_for_del; + file_map_init(&renv->fmap); + renv->goforward = false; + renv->cp = toku_cachetable_get_checkpointer(renv->ct); + toku_dbt_array_init(&renv->dest_keys, 1); + toku_dbt_array_init(&renv->dest_vals, 1); + if (tokuft_recovery_trace) + fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__); + return r; +} + +static void recover_env_cleanup (RECOVER_ENV renv) { + invariant_zero(renv->fmap.filenums->size()); + file_map_destroy(&renv->fmap); + + if (renv->destroy_logger_at_end) { + toku_logger_close_rollback(renv->logger); + int r = toku_logger_close(&renv->logger); + assert(r == 0); + } else { + toku_logger_write_log_files(renv->logger, true); + } + + if (renv->keep_cachetable_callback) { + renv->ct = NULL; + } else { + toku_cachetable_close(&renv->ct); + } + toku_dbt_array_destroy(&renv->dest_keys); + toku_dbt_array_destroy(&renv->dest_vals); + + if (tokuft_recovery_trace) + fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__); +} + +static const char *recover_state(RECOVER_ENV renv) { + return scan_state_string(&renv->ss); +} + +// Open the file if it is not already open. If it is already open, then do nothing. +static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, uint32_t treeflags, + TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) { + int r = 0; + FT_HANDLE ft_handle = NULL; + char *iname = fixup_fname(bs_iname); + + toku_ft_handle_create(&ft_handle); + toku_ft_set_flags(ft_handle, treeflags); + + if (nodesize != 0) { + toku_ft_handle_set_nodesize(ft_handle, nodesize); + } + + if (basementnodesize != 0) { + toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize); + } + + if (compression_method != TOKU_DEFAULT_COMPRESSION_METHOD) { + toku_ft_handle_set_compression_method(ft_handle, compression_method); + } + + // set the key compare functions + if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare) { + toku_ft_set_bt_compare(ft_handle, renv->bt_compare); + } + + if (renv->update_function) { + toku_ft_set_update(ft_handle, renv->update_function); + } + + // TODO mode (FUTURE FEATURE) + //mode = mode; + + r = toku_ft_handle_open_recovery(ft_handle, iname, must_create, must_create, renv->ct, txn, filenum, max_acceptable_lsn); + if (r != 0) { + //Note: If ft_handle_open fails, then close_ft will NOT write a header to disk. + //No need to provide lsn, so use the regular toku_ft_handle_close function + toku_ft_handle_close(ft_handle); + toku_free(iname); + if (r == ENOENT) //Not an error to simply be missing. + r = 0; + return r; + } + + file_map_insert(&renv->fmap, filenum, ft_handle, iname); + return 0; +} + +static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) { + int r; + TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger); + switch (renv->ss.ss) { + case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: + assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn); + invariant(renv->ss.last_xid == TXNID_NONE); + renv->ss.last_xid = l->last_xid; + toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid); + + r = 0; + break; + case FORWARD_NEWER_CHECKPOINT_END: + assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn); + // Verify last_xid is no older than the previous begin + invariant(l->last_xid >= renv->ss.last_xid); + // Verify last_xid is no older than the newest txn + invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr)); + + r = 0; // ignore it (log only has a begin checkpoint) + break; + default: + fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss); + abort(); + break; + } + return r; +} + +static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) { + int r; + time_t tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery bw_begin_checkpoint at %" PRIu64 " timestamp %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv)); + switch (renv->ss.ss) { + case BACKWARD_NEWER_CHECKPOINT_END: + // incomplete checkpoint, nothing to do + r = 0; + break; + case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END: + assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn); + renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END; + renv->ss.checkpoint_begin_timestamp = l->timestamp; + renv->goforward = true; + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery turning around at begin checkpoint %" PRIu64 " time %" PRIu64 "\n", + ctime(&tnow), l->lsn.lsn, + renv->ss.checkpoint_end_timestamp - renv->ss.checkpoint_begin_timestamp); + r = 0; + break; + default: + fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss); + abort(); + break; + } + return r; +} + +static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) { + int r; + switch (renv->ss.ss) { + case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: + assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn); + assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn); + assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate); + assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen); + renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END; + r = 0; + break; + case FORWARD_NEWER_CHECKPOINT_END: + assert(0); + return 0; + default: + assert(0); + return 0; + } + return r; +} + +static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) { + time_t tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery bw_end_checkpoint at %" PRIu64 " timestamp %" PRIu64 " xid %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, l->lsn_begin_checkpoint.lsn, recover_state(renv)); + switch (renv->ss.ss) { + case BACKWARD_NEWER_CHECKPOINT_END: + renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END; + renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn; + renv->ss.checkpoint_end_lsn.lsn = l->lsn.lsn; + renv->ss.checkpoint_end_timestamp = l->timestamp; + return 0; + case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END: + fprintf(stderr, "PerconaFT recovery %s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n", __FILE__, __LINE__); + abort(); + default: + break; + } + fprintf(stderr, "PerconaFT recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss); + abort(); +} + +static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) { + struct file_map_tuple *tuple = NULL; + int r = file_map_find(&renv->fmap, l->filenum, &tuple); + char *fname = fixup_fname(&l->iname); + switch (renv->ss.ss) { + case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: + renv->ss.checkpoint_num_fassociate++; + assert(r==DB_NOTFOUND); //Not open + // Open it if it exists. + // If rollback file, specify which checkpointed version of file we need (not just the latest) + // because we cannot use a rollback log that is later than the last complete checkpoint. See #3113. + { + bool rollback_file = (0==strcmp(fname, toku_product_name_strings.rollback_cachefile)); + LSN max_acceptable_lsn = MAX_LSN; + if (rollback_file) { + max_acceptable_lsn = renv->ss.checkpoint_begin_lsn; + FT_HANDLE t; + toku_ft_handle_create(&t); + r = toku_ft_handle_open_recovery(t, toku_product_name_strings.rollback_cachefile, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn); + renv->logger->rollback_cachefile = t->ft->cf; + toku_logger_initialize_rollback_cache(renv->logger, t->ft); + } else { + r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn); + assert(r==0); + } + } + // try to open the file again and if we get it, restore + // the unlink on close bit. + int ret; + ret = file_map_find(&renv->fmap, l->filenum, &tuple); + if (ret == 0 && l->unlink_on_close) { + toku_cachefile_unlink_on_close(tuple->ft_handle->ft->cf); + } + break; + case FORWARD_NEWER_CHECKPOINT_END: + if (r == 0) { //IF it is open + // assert that the filenum maps to the correct iname + assert(strcmp(fname, tuple->iname) == 0); + } + r = 0; + break; + default: + assert(0); + return 0; + } + toku_free(fname); + + return r; +} + +static int toku_recover_backward_fassociate (struct logtype_fassociate *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int +recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOGGER logger) { + int r; + + // lookup the parent + TOKUTXN parent = NULL; + if (!txn_pair_is_none(parentxid)) { + toku_txnid2txn(logger, parentxid, &parent); + assert(parent!=NULL); + } + else { + invariant(xid.child_id64 == TXNID_NONE); + } + + // create a transaction and bind it to the transaction id + TOKUTXN txn = NULL; + { + //Verify it does not yet exist. + toku_txnid2txn(logger, xid, &txn); + assert(txn==NULL); + } + r = toku_txn_begin_with_xid( + parent, + &txn, + logger, + xid, + TXN_SNAPSHOT_NONE, + NULL, + true, // for_recovery + false // read_only + ); + assert(r == 0); + // We only know about it because it was logged. Restore the log bit. + // Logging is 'off' but it will still set the bit. + toku_maybe_log_begin_txn_for_write_operation(txn); + if (txnp) *txnp = txn; + return 0; +} + +static int recover_xstillopen_internal (TOKUTXN *txnp, + LSN UU(lsn), + TXNID_PAIR xid, + TXNID_PAIR parentxid, + uint64_t rollentry_raw_count, + FILENUMS open_filenums, + bool force_fsync_on_commit, + uint64_t num_rollback_nodes, + uint64_t num_rollentries, + BLOCKNUM spilled_rollback_head, + BLOCKNUM spilled_rollback_tail, + BLOCKNUM current_rollback, + uint32_t UU(crc), + uint32_t UU(len), + RECOVER_ENV renv) { + int r; + *txnp = NULL; + switch (renv->ss.ss) { + case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: { + renv->ss.checkpoint_num_xstillopen++; + invariant(renv->ss.last_xid != TXNID_NONE); + invariant(xid.parent_id64 <= renv->ss.last_xid); + TOKUTXN txn = NULL; + { //Create the transaction. + r = recover_transaction(&txn, xid, parentxid, renv->logger); + assert(r==0); + assert(txn!=NULL); + *txnp = txn; + } + { //Recover rest of transaction. +#define COPY_TO_INFO(field) .field = field + struct txninfo info = { + COPY_TO_INFO(rollentry_raw_count), + .num_fts = 0, //Set afterwards + .open_fts = NULL, //Set afterwards + COPY_TO_INFO(force_fsync_on_commit), + COPY_TO_INFO(num_rollback_nodes), + COPY_TO_INFO(num_rollentries), + COPY_TO_INFO(spilled_rollback_head), + COPY_TO_INFO(spilled_rollback_tail), + COPY_TO_INFO(current_rollback) + }; +#undef COPY_TO_INFO + //Generate open_fts + FT array[open_filenums.num]; //Allocate maximum possible requirement + info.open_fts = array; + uint32_t i; + for (i = 0; i < open_filenums.num; i++) { + //open_filenums.filenums[] + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, open_filenums.filenums[i], &tuple); + if (r==0) { + info.open_fts[info.num_fts++] = tuple->ft_handle->ft; + } + else { + assert(r==DB_NOTFOUND); + } + } + r = toku_txn_load_txninfo(txn, &info); + assert(r==0); + } + break; + } + case FORWARD_NEWER_CHECKPOINT_END: { + // assert that the transaction exists + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, xid, &txn); + r = 0; + *txnp = txn; + break; + } + default: + assert(0); + return 0; + } + return r; +} + +static int toku_recover_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) { + TOKUTXN txn; + return recover_xstillopen_internal (&txn, + l->lsn, + l->xid, + l->parentxid, + l->rollentry_raw_count, + l->open_filenums, + l->force_fsync_on_commit, + l->num_rollback_nodes, + l->num_rollentries, + l->spilled_rollback_head, + l->spilled_rollback_tail, + l->current_rollback, + l->crc, + l->len, + renv); +} + +static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l, RECOVER_ENV renv) { + TOKUTXN txn; + int r = recover_xstillopen_internal (&txn, + l->lsn, + l->xid, + TXNID_PAIR_NONE, + l->rollentry_raw_count, + l->open_filenums, + l->force_fsync_on_commit, + l->num_rollback_nodes, + l->num_rollentries, + l->spilled_rollback_head, + l->spilled_rollback_tail, + l->current_rollback, + l->crc, + l->len, + renv); + if (r != 0) { + goto exit; + } + switch (renv->ss.ss) { + case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: { + toku_txn_prepare_txn(txn, l->xa_xid, 0); + break; + } + case FORWARD_NEWER_CHECKPOINT_END: { + assert(txn->state == TOKUTXN_PREPARING); + break; + } + default: { + assert(0); + } + } +exit: + return r; +} + +static int toku_recover_backward_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} +static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenprepared *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) { + int r; + r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger); + return r; +} + +static int toku_recover_backward_xbegin (struct logtype_xbegin *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +struct toku_txn_progress_extra { + time_t tlast; + LSN lsn; + const char *type; + TXNID_PAIR xid; + uint64_t last_total; +}; + +static void toku_recover_txn_progress(TOKU_TXN_PROGRESS txn_progress, void *extra) { + toku_txn_progress_extra *txn_progress_extra = static_cast<toku_txn_progress_extra *>(extra); + if (txn_progress_extra->last_total == 0) + txn_progress_extra->last_total = txn_progress->entries_total; + else + assert(txn_progress_extra->last_total == txn_progress->entries_total); + time_t tnow = time(NULL); + if (tnow - txn_progress_extra->tlast >= tokuft_recovery_progress_time) { + txn_progress_extra->tlast = tnow; + fprintf(stderr, "%.24s PerconaFT ", ctime(&tnow)); + if (txn_progress_extra->lsn.lsn != 0) + fprintf(stderr, "lsn %" PRIu64 " ", txn_progress_extra->lsn.lsn); + fprintf(stderr, "%s xid %" PRIu64 ":%" PRIu64 " ", + txn_progress_extra->type, txn_progress_extra->xid.parent_id64, txn_progress_extra->xid.child_id64); + fprintf(stderr, "%" PRIu64 "/%" PRIu64 " ", + txn_progress->entries_processed, txn_progress->entries_total); + if (txn_progress->entries_total > 0) + fprintf(stderr, "%.0f%% ", ((double) txn_progress->entries_processed / (double) txn_progress->entries_total) * 100.0); + fprintf(stderr, "\n"); + } +} + +static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) { + // find the transaction by transaction id + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + + // commit the transaction + toku_txn_progress_extra extra = { time(NULL), l->lsn, "commit", l->xid, 0 }; + int r = toku_txn_commit_with_lsn(txn, true, l->lsn, toku_recover_txn_progress, &extra); + assert(r == 0); + + // close the transaction + toku_txn_close_txn(txn); + + return 0; +} + +static int toku_recover_backward_xcommit (struct logtype_xcommit *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) { + // find the transaction by transaction id + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + + // Save the transaction + toku_txn_prepare_txn(txn, l->xa_xid, 0); + + return 0; +} + +static int toku_recover_backward_xprepare (struct logtype_xprepare *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + + + +static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { + int r; + + // find the transaction by transaction id + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + + // abort the transaction + toku_txn_progress_extra extra = { time(NULL), l->lsn, "abort", l->xid, 0 }; + r = toku_txn_abort_with_lsn(txn, l->lsn, toku_recover_txn_progress, &extra); + assert(r == 0); + + // close the transaction + toku_txn_close_txn(txn); + + return 0; +} + +static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +// fcreate is like fopen except that the file must be created. +static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) { + int r; + + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + + // assert that filenum is closed + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + assert(r==DB_NOTFOUND); + + assert(txn!=NULL); + + //unlink if it exists (recreate from scratch). + char *iname = fixup_fname(&l->iname); + char *iname_in_cwd = toku_cachetable_get_fname_in_cwd(renv->ct, iname); + r = unlink(iname_in_cwd); + if (r != 0) { + int er = get_error_errno(); + if (er != ENOENT) { + fprintf(stderr, "PerconaFT recovery %s:%d unlink %s %d\n", __FUNCTION__, __LINE__, iname, er); + toku_free(iname); + return r; + } + } + assert(0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)); //Creation of rollback cachefile never gets logged. + toku_free(iname_in_cwd); + toku_free(iname); + + bool must_create = true; + r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, txn, l->nodesize, l->basementnodesize, (enum toku_compression_method) l->compression_method, MAX_LSN); + return r; +} + +static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + + + +static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) { + int r; + + // assert that filenum is closed + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + assert(r==DB_NOTFOUND); + + bool must_create = false; + TOKUTXN txn = NULL; + char *fname = fixup_fname(&l->iname); + + assert(0!=strcmp(fname, toku_product_name_strings.rollback_cachefile)); //Rollback cachefile can be opened only via fassociate. + r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, txn, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, MAX_LSN); + + toku_free(fname); + return r; +} + +static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_change_fdescriptor (struct logtype_change_fdescriptor *l, RECOVER_ENV renv) { + int r; + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r==0) { + TOKUTXN txn = NULL; + //Maybe do the descriptor (lsn filter) + toku_txnid2txn(renv->logger, l->xid, &txn); + DBT old_descriptor, new_descriptor; + toku_fill_dbt( + &old_descriptor, + l->old_descriptor.data, + l->old_descriptor.len + ); + toku_fill_dbt( + &new_descriptor, + l->new_descriptor.data, + l->new_descriptor.len + ); + toku_ft_change_descriptor( + tuple->ft_handle, + &old_descriptor, + &new_descriptor, + false, + txn, + l->update_cmp_descriptor + ); + } + return 0; +} + +static int toku_recover_backward_change_fdescriptor (struct logtype_change_fdescriptor *UU(l), RECOVER_ENV UU(renv)) { + return 0; +} + + +// if file referred to in l is open, close it +static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV renv) { + struct file_map_tuple *tuple = NULL; + int r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r == 0) { // if file is open + char *iname = fixup_fname(&l->iname); + assert(strcmp(tuple->iname, iname) == 0); // verify that file_map has same iname as log entry + + if (0!=strcmp(iname, toku_product_name_strings.rollback_cachefile)) { + //Rollback cachefile is closed manually at end of recovery, not here + toku_ft_handle_close_recovery(tuple->ft_handle, l->lsn); + } + file_map_remove(&renv->fmap, l->filenum); + toku_free(iname); + } + return 0; +} + +static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +// fdelete is a transactional file delete. +static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) { + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn != NULL); + + // if the forward scan in recovery found this file and opened it, we + // need to mark the txn to remove the ft on commit. if the file was + // not found and not opened, we don't need to do anything - the ft + // is already gone, so we're happy. + struct file_map_tuple *tuple; + int r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r == 0) { + toku_ft_unlink_on_commit(tuple->ft_handle, txn); + } + return 0; +} + +static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r==0) { + //Maybe do the insertion if we found the cachefile. + DBT keydbt, valdbt; + toku_fill_dbt(&keydbt, l->key.data, l->key.len); + toku_fill_dbt(&valdbt, l->value.data, l->value.len); + toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT); + toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft); + } + return 0; +} + +static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r==0) { + //Maybe do the insertion if we found the cachefile. + DBT keydbt, valdbt; + toku_fill_dbt(&keydbt, l->key.data, l->key.len); + toku_fill_dbt(&valdbt, l->value.data, l->value.len); + toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT_NO_OVERWRITE); + } + return 0; +} + +static int toku_recover_backward_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r==0) { + //Maybe do the deletion if we found the cachefile. + DBT keydbt; + toku_fill_dbt(&keydbt, l->key.data, l->key.len); + toku_ft_maybe_delete(tuple->ft_handle, &keydbt, txn, true, l->lsn, false); + } + return 0; +} + +static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + DB *src_db = NULL; + bool do_inserts = true; + { + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->src_filenum, &tuple); + if (l->src_filenum.fileid == FILENUM_NONE.fileid) + assert(r==DB_NOTFOUND); + else { + if (r == 0) + src_db = &tuple->fake_db; + else + do_inserts = false; // src file was probably deleted, #3129 + } + } + + if (do_inserts) { + DBT src_key, src_val; + + toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len); + toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len); + + for (uint32_t file = 0; file < l->dest_filenums.num; file++) { + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple); + if (r==0) { + // We found the cachefile. (maybe) Do the insert. + DB *db = &tuple->fake_db; + + DBT_ARRAY key_array; + DBT_ARRAY val_array; + if (db != src_db) { + r = renv->generate_row_for_put(db, src_db, &renv->dest_keys, &renv->dest_vals, &src_key, &src_val); + assert(r==0); + invariant(renv->dest_keys.size <= renv->dest_keys.capacity); + invariant(renv->dest_vals.size <= renv->dest_vals.capacity); + invariant(renv->dest_keys.size == renv->dest_vals.size); + key_array = renv->dest_keys; + val_array = renv->dest_vals; + } else { + key_array.size = key_array.capacity = 1; + key_array.dbts = &src_key; + + val_array.size = val_array.capacity = 1; + val_array.dbts = &src_val; + } + for (uint32_t i = 0; i < key_array.size; i++) { + toku_ft_maybe_insert(tuple->ft_handle, &key_array.dbts[i], &val_array.dbts[i], txn, true, l->lsn, false, FT_INSERT); + } + } + } + } + + return 0; +} + +static int toku_recover_backward_enq_insert_multiple (struct logtype_enq_insert_multiple *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + DB *src_db = NULL; + bool do_deletes = true; + { + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->src_filenum, &tuple); + if (l->src_filenum.fileid == FILENUM_NONE.fileid) + assert(r==DB_NOTFOUND); + else { + if (r == 0) { + src_db = &tuple->fake_db; + } else { + do_deletes = false; // src file was probably deleted, #3129 + } + } + } + + if (do_deletes) { + DBT src_key, src_val; + toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len); + toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len); + + for (uint32_t file = 0; file < l->dest_filenums.num; file++) { + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple); + if (r==0) { + // We found the cachefile. (maybe) Do the delete. + DB *db = &tuple->fake_db; + + DBT_ARRAY key_array; + if (db != src_db) { + r = renv->generate_row_for_del(db, src_db, &renv->dest_keys, &src_key, &src_val); + assert(r==0); + invariant(renv->dest_keys.size <= renv->dest_keys.capacity); + key_array = renv->dest_keys; + } else { + key_array.size = key_array.capacity = 1; + key_array.dbts = &src_key; + } + for (uint32_t i = 0; i < key_array.size; i++) { + toku_ft_maybe_delete(tuple->ft_handle, &key_array.dbts[i], txn, true, l->lsn, false); + } + } + } + } + + return 0; +} + +static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_multiple *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_enq_update(struct logtype_enq_update *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn != NULL); + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r == 0) { + // Maybe do the update if we found the cachefile. + DBT key, extra; + toku_fill_dbt(&key, l->key.data, l->key.len); + toku_fill_dbt(&extra, l->extra.data, l->extra.len); + toku_ft_maybe_update(tuple->ft_handle, &key, &extra, txn, true, l->lsn, false); + } + return 0; +} + +static int toku_recover_enq_updatebroadcast(struct logtype_enq_updatebroadcast *l, RECOVER_ENV renv) { + int r; + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn != NULL); + struct file_map_tuple *tuple = NULL; + r = file_map_find(&renv->fmap, l->filenum, &tuple); + if (r == 0) { + // Maybe do the update broadcast if we found the cachefile. + DBT extra; + toku_fill_dbt(&extra, l->extra.data, l->extra.len); + toku_ft_maybe_update_broadcast(tuple->ft_handle, &extra, txn, true, + l->lsn, false, l->is_resetting_op); + } + return 0; +} + +static int toku_recover_backward_enq_update(struct logtype_enq_update *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_backward_enq_updatebroadcast(struct logtype_enq_updatebroadcast *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_backward_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_backward_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) { + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + char *new_iname = fixup_fname(&l->new_iname); + + toku_ft_load_recovery(txn, l->old_filenum, new_iname, 0, 0, (LSN*)NULL); + + toku_free(new_iname); + return 0; +} + +static int toku_recover_backward_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +// #2954 +static int toku_recover_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) { + TOKUTXN txn = NULL; + toku_txnid2txn(renv->logger, l->xid, &txn); + assert(txn!=NULL); + // just make an entry in the rollback log + // - set do_log = 0 -> don't write to recovery log + toku_ft_hot_index_recovery(txn, l->hot_index_filenums, 0, 0, (LSN*)NULL); + return 0; +} + +// #2954 +static int toku_recover_backward_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) { + // nothing + return 0; +} + +// Effects: If there are no log files, or if there is a clean "shutdown" at +// the end of the log, then we don't need recovery to run. +// Returns: true if we need recovery, otherwise false. +int tokuft_needs_recovery(const char *log_dir, bool ignore_log_empty) { + int needs_recovery; + int r; + TOKULOGCURSOR logcursor = NULL; + + r = toku_logcursor_create(&logcursor, log_dir); + if (r != 0) { + needs_recovery = true; goto exit; + } + + struct log_entry *le; + le = NULL; + r = toku_logcursor_last(logcursor, &le); + if (r == 0) { + needs_recovery = le->cmd != LT_shutdown; + } + else { + needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty); + } + exit: + if (logcursor) { + r = toku_logcursor_destroy(&logcursor); + assert(r == 0); + } + return needs_recovery; +} + +static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) { + return toku_txn_manager_num_live_root_txns(renv->logger->txn_manager); +} + +static int is_txn_unprepared(TOKUTXN txn, void* extra) { + TOKUTXN* ptxn = (TOKUTXN *)extra; + if (txn->state != TOKUTXN_PREPARING) { + *ptxn = txn; + return -1; // return -1 to get iterator to return + } + return 0; +} + +static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) { + TOKUTXN txn = nullptr; + int r = toku_txn_manager_iter_over_live_root_txns( + renv->logger->txn_manager, + is_txn_unprepared, + &txn + ); + assert(r == 0 || r == -1); + if (txn != nullptr) { + *txnp = txn; + return 0; + } + return DB_NOTFOUND; +} + +static int call_prepare_txn_callback_iter(TOKUTXN txn, void* extra) { + RECOVER_ENV* renv = (RECOVER_ENV *)extra; + invariant(txn->state == TOKUTXN_PREPARING); + invariant(txn->child == NULL); + (*renv)->prepared_txn_callback((*renv)->env, txn); + return 0; +} + +static void recover_abort_live_txn(TOKUTXN txn) { + fprintf(stderr, "%s %" PRIu64 "\n", __FUNCTION__, txn->txnid.parent_id64); + // recursively abort all children first + if (txn->child != NULL) { + recover_abort_live_txn(txn->child); + } + // sanity check that the recursive call successfully NULLs out txn->child + invariant(txn->child == NULL); + // abort the transaction + toku_txn_progress_extra extra = { time(NULL), ZERO_LSN, "abort live", txn->txnid, 0 }; + int r = toku_txn_abort_txn(txn, toku_recover_txn_progress, &extra); + assert(r == 0); + + // close the transaction + toku_txn_close_txn(txn); +} + +// abort all of the remaining live transactions in descending transaction id order +static void recover_abort_all_live_txns(RECOVER_ENV renv) { + while (1) { + TOKUTXN txn; + int r = find_an_unprepared_txn(renv, &txn); + if (r==0) { + recover_abort_live_txn(txn); + } else if (r==DB_NOTFOUND) { + break; + } else { + abort(); + } + } + + // Now we have only prepared txns. These prepared txns don't have full DB_TXNs in them, so we need to make some. + int r = toku_txn_manager_iter_over_live_root_txns( + renv->logger->txn_manager, + call_prepare_txn_callback_iter, + &renv + ); + assert_zero(r); +} + +static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) { + if (le) { + LSN thislsn = toku_log_entry_get_lsn(le); + fprintf(stderr, "%s:%d r=%d cmd=%c lsn=%" PRIu64 "\n", f, l, r, le->cmd, thislsn.lsn); + } else + fprintf(stderr, "%s:%d r=%d cmd=?\n", f, l, r); +} + +// For test purposes only. +static void (*recover_callback_fx)(void*) = NULL; +static void * recover_callback_args = NULL; +static void (*recover_callback2_fx)(void*) = NULL; +static void * recover_callback2_args = NULL; + + +static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_dir) { + int r; + int rr = 0; + TOKULOGCURSOR logcursor = NULL; + struct log_entry *le = NULL; + + time_t tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery starting in env %s\n", ctime(&tnow), env_dir); + + char org_wd[1000]; + { + char *wd=getcwd(org_wd, sizeof(org_wd)); + assert(wd!=0); + } + + r = toku_logger_open(log_dir, renv->logger); + assert(r == 0); + + // grab the last LSN so that it can be restored when the log is restarted + LSN lastlsn = toku_logger_last_lsn(renv->logger); + LSN thislsn; + + // there must be at least one log entry + r = toku_logcursor_create(&logcursor, log_dir); + assert(r == 0); + + r = toku_logcursor_last(logcursor, &le); + if (r != 0) { + if (tokuft_recovery_trace) + fprintf(stderr, "RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r); + rr = DB_RUNRECOVERY; goto errorexit; + } + + r = toku_logcursor_destroy(&logcursor); + assert(r == 0); + + r = toku_logcursor_create(&logcursor, log_dir); + assert(r == 0); + + { + toku_struct_stat buf; + if (toku_stat(env_dir, &buf)!=0) { + rr = get_error_errno(); + fprintf(stderr, "%.24s PerconaFT recovery error: directory does not exist: %s\n", ctime(&tnow), env_dir); + goto errorexit; + } else if (!S_ISDIR(buf.st_mode)) { + fprintf(stderr, "%.24s PerconaFT recovery error: this file is supposed to be a directory, but is not: %s\n", ctime(&tnow), env_dir); + rr = ENOTDIR; goto errorexit; + } + } + // scan backwards + scan_state_init(&renv->ss); + tnow = time(NULL); + time_t tlast; + tlast = tnow; + fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 "\n", ctime(&tnow), lastlsn.lsn); + for (unsigned i=0; 1; i++) { + + // get the previous log entry (first time gets the last one) + le = NULL; + r = toku_logcursor_prev(logcursor, &le); + if (tokuft_recovery_trace) + recover_trace_le(__FUNCTION__, __LINE__, r, le); + if (r != 0) { + if (r == DB_NOTFOUND) + break; + rr = DB_RUNRECOVERY; + goto errorexit; + } + + // trace progress + if ((i % 1000) == 0) { + tnow = time(NULL); + if (tnow - tlast >= tokuft_recovery_progress_time) { + thislsn = toku_log_entry_get_lsn(le); + fprintf(stderr, "%.24s PerconaFT recovery scanning backward from %" PRIu64 " at %" PRIu64 " (%s)\n", + ctime(&tnow), lastlsn.lsn, thislsn.lsn, recover_state(renv)); + tlast = tnow; + } + } + + // dispatch the log entry handler + assert(renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END || + renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END); + logtype_dispatch_assign(le, toku_recover_backward_, r, renv); + if (tokuft_recovery_trace) + recover_trace_le(__FUNCTION__, __LINE__, r, le); + if (r != 0) { + if (tokuft_recovery_trace) + fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r); + rr = DB_RUNRECOVERY; + goto errorexit; + } + if (renv->goforward) + break; + } + + // run first callback + if (recover_callback_fx) + recover_callback_fx(recover_callback_args); + + // scan forwards + assert(le); + thislsn = toku_log_entry_get_lsn(le); + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery starts scanning forward to %" PRIu64 " from %" PRIu64 " left %" PRIu64 " (%s)\n", + ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv)); + + for (unsigned i=0; 1; i++) { + + // trace progress + if ((i % 1000) == 0) { + tnow = time(NULL); + if (tnow - tlast >= tokuft_recovery_progress_time) { + thislsn = toku_log_entry_get_lsn(le); + fprintf(stderr, "%.24s PerconaFT recovery scanning forward to %" PRIu64 " at %" PRIu64 " left %" PRIu64 " (%s)\n", + ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv)); + tlast = tnow; + } + } + + // dispatch the log entry handler (first time calls the forward handler for the log entry at the turnaround + assert(renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END || + renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END); + logtype_dispatch_assign(le, toku_recover_, r, renv); + if (tokuft_recovery_trace) + recover_trace_le(__FUNCTION__, __LINE__, r, le); + if (r != 0) { + if (tokuft_recovery_trace) + fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r); + rr = DB_RUNRECOVERY; + goto errorexit; + } + + // get the next log entry + le = NULL; + r = toku_logcursor_next(logcursor, &le); + if (tokuft_recovery_trace) + recover_trace_le(__FUNCTION__, __LINE__, r, le); + if (r != 0) { + if (r == DB_NOTFOUND) + break; + rr = DB_RUNRECOVERY; + goto errorexit; + } + } + + // verify the final recovery state + assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END); + + r = toku_logcursor_destroy(&logcursor); + assert(r == 0); + + // run second callback + if (recover_callback2_fx) + recover_callback2_fx(recover_callback2_args); + + // restart logging + toku_logger_restart(renv->logger, lastlsn); + + // abort the live transactions + { + uint32_t n = recover_get_num_live_txns(renv); + if (n > 0) { + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " live transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : ""); + } + } + recover_abort_all_live_txns(renv); + { + uint32_t n = recover_get_num_live_txns(renv); + if (n > 0) { + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery has %" PRIu32 " prepared transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : ""); + } + } + + // close the open dictionaries + uint32_t n; + n = file_map_get_num_dictionaries(&renv->fmap); + if (n > 0) { + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery closing %" PRIu32 " dictionar%s\n", ctime(&tnow), n, n > 1 ? "ies" : "y"); + } + file_map_close_dictionaries(&renv->fmap, lastlsn); + + { + // write a recovery log entry + BYTESTRING recover_comment = { static_cast<uint32_t>(strlen("recover")), (char *) "recover" }; + toku_log_comment(renv->logger, NULL, true, 0, recover_comment); + } + + // checkpoint + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery making a checkpoint\n", ctime(&tnow)); + r = toku_checkpoint(renv->cp, renv->logger, NULL, NULL, NULL, NULL, RECOVERY_CHECKPOINT); + assert(r == 0); + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery done\n", ctime(&tnow)); + + return 0; + + errorexit: + tnow = time(NULL); + fprintf(stderr, "%.24s PerconaFT recovery failed %d\n", ctime(&tnow), rr); + + if (logcursor) { + r = toku_logcursor_destroy(&logcursor); + assert(r == 0); + } + + return rr; +} + +int +toku_recover_lock(const char *lock_dir, int *lockfd) { + int e = toku_single_process_lock(lock_dir, "recovery", lockfd); + if (e != 0 && e != ENOENT) { + fprintf(stderr, "Couldn't run recovery because some other process holds the recovery lock\n"); + } + return e; +} + +int +toku_recover_unlock(int lockfd) { + int lockfd_copy = lockfd; + return toku_single_process_unlock(&lockfd_copy); +} + +int tokuft_recover(DB_ENV *env, + prepared_txn_callback_t prepared_txn_callback, + keep_cachetable_callback_t keep_cachetable_callback, + TOKULOGGER logger, + const char *env_dir, const char *log_dir, + ft_compare_func bt_compare, + ft_update_func update_function, + generate_row_for_put_func generate_row_for_put, + generate_row_for_del_func generate_row_for_del, + size_t cachetable_size) { + int r; + int lockfd = -1; + + r = toku_recover_lock(log_dir, &lockfd); + if (r != 0) + return r; + + int rr = 0; + if (tokuft_needs_recovery(log_dir, false)) { + struct recover_env renv; + r = recover_env_init(&renv, + env_dir, + env, + prepared_txn_callback, + keep_cachetable_callback, + logger, + bt_compare, + update_function, + generate_row_for_put, + generate_row_for_del, + cachetable_size); + assert(r == 0); + + rr = do_recovery(&renv, env_dir, log_dir); + + recover_env_cleanup(&renv); + } + + r = toku_recover_unlock(lockfd); + if (r != 0) + return r; + + return rr; +} + +// Return 0 if recovery log exists, ENOENT if log is missing +int +tokuft_recover_log_exists(const char * log_dir) { + int r; + TOKULOGCURSOR logcursor; + + r = toku_logcursor_create(&logcursor, log_dir); + if (r == 0) { + int rclose; + r = toku_logcursor_log_exists(logcursor); // return ENOENT if no log + rclose = toku_logcursor_destroy(&logcursor); + assert(rclose == 0); + } + else + r = ENOENT; + + return r; +} + +void toku_recover_set_callback (void (*callback_fx)(void*), void* callback_args) { + recover_callback_fx = callback_fx; + recover_callback_args = callback_args; +} + +void toku_recover_set_callback2 (void (*callback_fx)(void*), void* callback_args) { + recover_callback2_fx = callback_fx; + recover_callback2_args = callback_args; +} |