diff options
author | Sergei Golubchik <serg@mariadb.org> | 2016-06-28 22:01:55 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2016-06-28 22:01:55 +0200 |
commit | 3361aee591b1eb8c676f60887ffc535cd509890a (patch) | |
tree | 54a65f83ba7d9293e6f8e8281ad920fbae6eb823 /storage/tokudb/PerconaFT | |
parent | 6ce20fb2b9fe57330c797694b9dbea4028f40d7c (diff) | |
parent | 0fdb17e6c3f50ae22eb97b6363bcbd8b0cd9e040 (diff) | |
download | mariadb-git-3361aee591b1eb8c676f60887ffc535cd509890a.tar.gz |
Merge branch '10.0' into 10.1
Diffstat (limited to 'storage/tokudb/PerconaFT')
68 files changed, 2764 insertions, 843 deletions
diff --git a/storage/tokudb/PerconaFT/CMakeLists.txt b/storage/tokudb/PerconaFT/CMakeLists.txt index 843b4c9d0e8..0e283c13c61 100644 --- a/storage/tokudb/PerconaFT/CMakeLists.txt +++ b/storage/tokudb/PerconaFT/CMakeLists.txt @@ -1,3 +1,6 @@ +if (CMAKE_PROJECT_NAME STREQUAL TokuDB) + cmake_minimum_required(VERSION 2.8.8 FATAL_ERROR) +endif() set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules") project(TokuDB) diff --git a/storage/tokudb/PerconaFT/CTestCustom.cmake b/storage/tokudb/PerconaFT/CTestCustom.cmake.in index 54170b2b903..54170b2b903 100644 --- a/storage/tokudb/PerconaFT/CTestCustom.cmake +++ b/storage/tokudb/PerconaFT/CTestCustom.cmake.in diff --git a/storage/tokudb/PerconaFT/README.md b/storage/tokudb/PerconaFT/README.md index 7e30a558bc7..d53caf00190 100644 --- a/storage/tokudb/PerconaFT/README.md +++ b/storage/tokudb/PerconaFT/README.md @@ -113,11 +113,11 @@ All source code and test contributions must be provided under a [BSD 2-Clause][b License ------- -PerconaFT is available under the GPL version 2, and AGPL version 3, with slight modifications. +PerconaFT is available under the GPL version 2, and AGPL version 3. See [COPYING.AGPLv3][agpllicense], [COPYING.GPLv2][gpllicense], and [PATENTS][patents]. -[agpllicense]: http://github.com/Perona/PerconaFT/blob/master/COPYING.AGPLv3 -[gpllicense]: http://github.com/Perona/PerconaFT/blob/master/COPYING.GPLv2 -[patents]: http://github.com/Perona/PerconaFT/blob/master/PATENTS +[agpllicense]: http://github.com/Percona/PerconaFT/blob/master/COPYING.AGPLv3 +[gpllicense]: http://github.com/Percona/PerconaFT/blob/master/COPYING.GPLv2 +[patents]: http://github.com/Percona/PerconaFT/blob/master/PATENTS diff --git a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc index 958f00a8706..5c29209e19d 100644 --- a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc +++ b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc @@ -510,8 +510,9 @@ static void print_db_struct (void) { "int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, uint32_t flags)", "int (*get_fractal_tree_info64)(DB*,uint64_t*,uint64_t*,uint64_t*,uint64_t*)", "int (*iterate_fractal_tree_block_map)(DB*,int(*)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*),void*)", - "const char *(*get_dname)(DB *db)", - "int (*get_last_key)(DB *db, YDB_CALLBACK_FUNCTION func, void* extra)", + "const char *(*get_dname)(DB *db)", + "int (*get_last_key)(DB *db, YDB_CALLBACK_FUNCTION func, void* extra)", + "int (*recount_rows)(DB* db, int (*progress_callback)(uint64_t count, uint64_t deleted, void* progress_extra), void* progress_extra)", NULL}; sort_and_dump_fields("db", true, extra); } diff --git a/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake b/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake index 15066906831..e1da095fc00 100644 --- a/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake +++ b/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake @@ -48,7 +48,8 @@ MACRO(TOKU_MERGE_STATIC_LIBS TARGET OUTPUT_NAME LIBS_TO_MERGE) ENDIF() ENDFOREACH() IF(OSLIBS) - #LIST(REMOVE_DUPLICATES OSLIBS) + # REMOVE_DUPLICATES destroys the order of the libs so disabled + # LIST(REMOVE_DUPLICATES OSLIBS) TARGET_LINK_LIBRARIES(${TARGET} LINK_PUBLIC ${OSLIBS}) ENDIF() diff --git a/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake b/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake index 40ccbcc0aed..77f6d8f67b7 100644 --- a/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake +++ b/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake @@ -24,12 +24,12 @@ endif () ## add TOKU_PTHREAD_DEBUG for debug builds if (CMAKE_VERSION VERSION_LESS 3.0) - set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG TOKU_PTHREAD_DEBUG=1) - set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DRD TOKU_PTHREAD_DEBUG=1) + set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG TOKU_PTHREAD_DEBUG=1 TOKU_DEBUG_TXN_SYNC=1) + set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DRD TOKU_PTHREAD_DEBUG=1 TOKU_DEBUG_TXN_SYNC=1) set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DRD _FORTIFY_SOURCE=2) else () set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS - $<$<OR:$<CONFIG:DEBUG>,$<CONFIG:DRD>>:TOKU_PTHREAD_DEBUG=1> + $<$<OR:$<CONFIG:DEBUG>,$<CONFIG:DRD>>:TOKU_PTHREAD_DEBUG=1 TOKU_DEBUG_TXN_SYNC=1> $<$<CONFIG:DRD>:_FORTIFY_SOURCE=2> ) endif () @@ -65,8 +65,10 @@ set_cflags_if_supported( -Wno-error=missing-format-attribute -Wno-error=address-of-array-temporary -Wno-error=tautological-constant-out-of-range-compare + -Wno-error=maybe-uninitialized -Wno-ignored-attributes -Wno-error=extern-c-compat + -Wno-pointer-bool-conversion -fno-rtti -fno-exceptions ) @@ -119,13 +121,18 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL Clang) set(CMAKE_C_FLAGS_RELEASE "-g -O3 ${CMAKE_C_FLAGS_RELEASE} -UNDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 ${CMAKE_CXX_FLAGS_RELEASE} -UNDEBUG") else () + if (APPLE) + set(FLTO_OPTS "-fwhole-program") + else () + set(FLTO_OPTS "-fuse-linker-plugin") + endif() # we overwrite this because the default passes -DNDEBUG and we don't want that - set(CMAKE_C_FLAGS_RELWITHDEBINFO "-flto -fuse-linker-plugin ${CMAKE_C_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG") - set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-flto -fuse-linker-plugin ${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG") - set(CMAKE_C_FLAGS_RELEASE "-g -O3 -flto -fuse-linker-plugin ${CMAKE_C_FLAGS_RELEASE} -UNDEBUG") - set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 -flto -fuse-linker-plugin ${CMAKE_CXX_FLAGS_RELEASE} -UNDEBUG") - set(CMAKE_EXE_LINKER_FLAGS "-g -fuse-linker-plugin ${CMAKE_EXE_LINKER_FLAGS}") - set(CMAKE_SHARED_LINKER_FLAGS "-g -fuse-linker-plugin ${CMAKE_SHARED_LINKER_FLAGS}") + set(CMAKE_C_FLAGS_RELWITHDEBINFO "-flto ${FLTO_OPTS} ${CMAKE_C_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG") + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-flto ${FLTO_OPTS} ${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG") + set(CMAKE_C_FLAGS_RELEASE "-g -O3 -flto ${FLTO_OPTS} ${CMAKE_C_FLAGS_RELEASE} -UNDEBUG") + set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 -flto ${FLTO_OPTS} ${CMAKE_CXX_FLAGS_RELEASE} -UNDEBUG") + set(CMAKE_EXE_LINKER_FLAGS "-g ${FLTO_OPTS} ${CMAKE_EXE_LINKER_FLAGS}") + set(CMAKE_SHARED_LINKER_FLAGS "-g ${FLTO_OPTS} ${CMAKE_SHARED_LINKER_FLAGS}") endif () ## set warnings @@ -159,15 +166,6 @@ endif () set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}") set(CMAKE_CXX_FLAGS "-Wall -Werror ${CMAKE_CXX_FLAGS}") -## need to set -stdlib=libc++ to get real c++11 support on darwin -if (APPLE) - if (CMAKE_GENERATOR STREQUAL Xcode) - set(CMAKE_XCODE_ATTRIBUTE_CLANG_CXX_LIBRARY "libc++") - else () - add_definitions(-stdlib=libc++) - endif () -endif () - # pick language dialect set(CMAKE_C_FLAGS "-std=c99 ${CMAKE_C_FLAGS}") check_cxx_compiler_flag(-std=c++11 HAVE_STDCXX11) diff --git a/storage/tokudb/PerconaFT/ft/CMakeLists.txt b/storage/tokudb/PerconaFT/ft/CMakeLists.txt index 744b9c9a9e1..11091073ac2 100644 --- a/storage/tokudb/PerconaFT/ft/CMakeLists.txt +++ b/storage/tokudb/PerconaFT/ft/CMakeLists.txt @@ -36,6 +36,7 @@ set(FT_SOURCES ft-flusher ft-hot-flusher ft-ops + ft-recount-rows ft-status ft-test-helpers ft-verify diff --git a/storage/tokudb/PerconaFT/ft/ft-flusher.cc b/storage/tokudb/PerconaFT/ft/ft-flusher.cc index 530947fe868..fb456ea6a18 100644 --- a/storage/tokudb/PerconaFT/ft/ft-flusher.cc +++ b/storage/tokudb/PerconaFT/ft/ft-flusher.cc @@ -1572,6 +1572,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p txn_gc_info *gc_info; STAT64INFO_S stats_delta; + int64_t logical_rows_delta = 0; size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use(); flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) : @@ -1599,8 +1600,8 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p is_fresh, gc_info, flow_deltas, - &stats_delta - ); + &stats_delta, + &logical_rows_delta); remaining_memsize -= memsize_in_buffer; return 0; } @@ -1613,6 +1614,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) { toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta); } + toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta); if (do_garbage_collection) { size_t buffsize = bnc->msg_buffer.buffer_size_in_use(); // may be misleading if there's a broadcast message in there diff --git a/storage/tokudb/PerconaFT/ft/ft-internal.h b/storage/tokudb/PerconaFT/ft/ft-internal.h index 6bf7029245b..eec591d1744 100644 --- a/storage/tokudb/PerconaFT/ft/ft-internal.h +++ b/storage/tokudb/PerconaFT/ft/ft-internal.h @@ -143,6 +143,10 @@ struct ft_header { MSN msn_at_start_of_last_completed_optimize; STAT64INFO_S on_disk_stats; + + // This represents the balance of inserts - deletes and should be + // closer to a logical representation of the number of records in an index + uint64_t on_disk_logical_rows; }; typedef struct ft_header *FT_HEADER; @@ -176,6 +180,7 @@ struct ft { // protected by atomic builtins STAT64INFO_S in_memory_stats; + uint64_t in_memory_logical_rows; // transient, not serialized to disk. updated when we do write to // disk. tells us whether we can do partial eviction (we can't if diff --git a/storage/tokudb/PerconaFT/ft/ft-ops.cc b/storage/tokudb/PerconaFT/ft/ft-ops.cc index f5da82ee000..8f61bc67339 100644 --- a/storage/tokudb/PerconaFT/ft/ft-ops.cc +++ b/storage/tokudb/PerconaFT/ft/ft-ops.cc @@ -1371,7 +1371,8 @@ static void inject_message_in_locked_node( ft_msg msg_with_msn(msg.kdbt(), msg.vdbt(), msg.type(), msg_msn, msg.xids()); paranoid_invariant(msg_with_msn.msn().msn > node->max_msn_applied_to_node_on_disk.msn); - STAT64INFO_S stats_delta = {0,0}; + STAT64INFO_S stats_delta = { 0,0 }; + int64_t logical_rows_delta = 0; toku_ftnode_put_msg( ft->cmp, ft->update_fun, @@ -1381,11 +1382,12 @@ static void inject_message_in_locked_node( true, gc_info, flow_deltas, - &stats_delta - ); + &stats_delta, + &logical_rows_delta); if (stats_delta.numbytes || stats_delta.numrows) { toku_ft_update_stats(&ft->in_memory_stats, stats_delta); } + toku_ft_adjust_logical_row_count(ft, logical_rows_delta); // // assumption is that toku_ftnode_put_msg will // mark the node as dirty. @@ -2169,6 +2171,7 @@ int toku_ft_insert_unique(FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool if (r == 0) { ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, FT_INSERT); + toku_ft_adjust_logical_row_count(ft_h->ft, 1); } return r; } @@ -2344,6 +2347,7 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool if (r != 0) { toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info); } + toku_ft_adjust_logical_row_count(ft_h->ft, 1); } } @@ -2513,6 +2517,7 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali oldest_referenced_xid_estimate, txn != nullptr ? !txn->for_recovery : false); toku_ft_send_delete(ft_h, key, message_xids, &gc_info); + toku_ft_adjust_logical_row_count(ft_h->ft, -1); } } diff --git a/storage/tokudb/PerconaFT/ft/ft-ops.h b/storage/tokudb/PerconaFT/ft/ft-ops.h index 7d0b165b70c..313a74628ea 100644 --- a/storage/tokudb/PerconaFT/ft/ft-ops.h +++ b/storage/tokudb/PerconaFT/ft/ft-ops.h @@ -207,6 +207,15 @@ extern int toku_ft_debug_mode; int toku_verify_ft (FT_HANDLE ft_h) __attribute__ ((warn_unused_result)); int toku_verify_ft_with_progress (FT_HANDLE ft_h, int (*progress_callback)(void *extra, float progress), void *extra, int verbose, int keep_going) __attribute__ ((warn_unused_result)); +int toku_ft_recount_rows( + FT_HANDLE ft, + int (*progress_callback)( + uint64_t count, + uint64_t deleted, + void* progress_extra), + void* progress_extra); + + DICTIONARY_ID toku_ft_get_dictionary_id(FT_HANDLE); enum ft_flags { diff --git a/storage/tokudb/PerconaFT/ft/ft-recount-rows.cc b/storage/tokudb/PerconaFT/ft/ft-recount-rows.cc new file mode 100644 index 00000000000..adac96f4882 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/ft-recount-rows.cc @@ -0,0 +1,115 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "ft/serialize/block_table.h" +#include "ft/ft.h" +#include "ft/ft-internal.h" +#include "ft/cursor.h" + +struct recount_rows_extra_t { + int (*_progress_callback)( + uint64_t count, + uint64_t deleted, + void* progress_extra); + void* _progress_extra; + uint64_t _keys; + bool _cancelled; +}; + +static int recount_rows_found( + uint32_t UU(keylen), + const void* key, + uint32_t UU(vallen), + const void* UU(val), + void* extra, + bool UU(lock_only)) { + + recount_rows_extra_t* rre = (recount_rows_extra_t*)extra; + + if (FT_LIKELY(key != nullptr)) { + rre->_keys++; + } + return rre->_cancelled + = rre->_progress_callback(rre->_keys, 0, rre->_progress_extra); +} +static bool recount_rows_interrupt(void* extra, uint64_t deleted_rows) { + recount_rows_extra_t* rre = (recount_rows_extra_t*)extra; + + return rre->_cancelled = + rre->_progress_callback(rre->_keys, deleted_rows, rre->_progress_extra); +} +int toku_ft_recount_rows( + FT_HANDLE ft, + int (*progress_callback)( + uint64_t count, + uint64_t deleted, + void* progress_extra), + void* progress_extra) { + + int ret = 0; + recount_rows_extra_t rre = { + progress_callback, + progress_extra, + 0, + false + }; + + ft_cursor c; + ret = toku_ft_cursor_create(ft, &c, nullptr, C_READ_ANY, false, false); + if (ret) return ret; + + toku_ft_cursor_set_check_interrupt_cb( + &c, + recount_rows_interrupt, + &rre); + + ret = toku_ft_cursor_first(&c, recount_rows_found, &rre); + while (FT_LIKELY(ret == 0)) { + ret = toku_ft_cursor_next(&c, recount_rows_found, &rre); + } + + toku_ft_cursor_destroy(&c); + + if (rre._cancelled == false) { + // update ft count + toku_unsafe_set(&ft->ft->in_memory_logical_rows, rre._keys); + ret = 0; + } + + return ret; +} diff --git a/storage/tokudb/PerconaFT/ft/ft-status.cc b/storage/tokudb/PerconaFT/ft/ft-status.cc index 982df1822c4..19a378c22bf 100644 --- a/storage/tokudb/PerconaFT/ft/ft-status.cc +++ b/storage/tokudb/PerconaFT/ft/ft-status.cc @@ -128,24 +128,24 @@ void CACHETABLE_STATUS_S::init() { CT_STATUS_INIT(CT_LONG_WAIT_PRESSURE_COUNT, CACHETABLE_LONG_WAIT_PRESSURE_COUNT, UINT64, "number of long waits on cache pressure"); CT_STATUS_INIT(CT_LONG_WAIT_PRESSURE_TIME, CACHETABLE_LONG_WAIT_PRESSURE_TIME, UINT64, "long time waiting on cache pressure"); - CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS, CACHETABLE_POOL_CLIENT_NUM_THREADS, UINT64, "number of threads in pool"); - CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CLIENT_NUM_THREADS_ACTIVE, UINT64, "number of currently active threads in pool"); - CT_STATUS_INIT(CT_POOL_CLIENT_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_QUEUE_SIZE, UINT64, "number of currently queued work items"); - CT_STATUS_INIT(CT_POOL_CLIENT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_MAX_QUEUE_SIZE, UINT64, "largest number of queued work items"); - CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, UINT64, "total number of work items processed"); - CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CLIENT_TOTAL_EXECUTION_TIME, UINT64, "total execution time of processing work items"); - CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS, CACHETABLE_POOL_CACHETABLE_NUM_THREADS, UINT64, "number of threads in pool"); - CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CACHETABLE_NUM_THREADS_ACTIVE, UINT64, "number of currently active threads in pool"); - CT_STATUS_INIT(CT_POOL_CACHETABLE_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_QUEUE_SIZE, UINT64, "number of currently queued work items"); - CT_STATUS_INIT(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_MAX_QUEUE_SIZE, UINT64, "largest number of queued work items"); - CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, UINT64, "total number of work items processed"); - CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, UINT64, "total execution time of processing work items"); - CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS, UINT64, "number of threads in pool"); - CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, UINT64, "number of currently active threads in pool"); - CT_STATUS_INIT(CT_POOL_CHECKPOINT_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_QUEUE_SIZE, UINT64, "number of currently queued work items"); - CT_STATUS_INIT(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_MAX_QUEUE_SIZE, UINT64, "largest number of queued work items"); - CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, UINT64, "total number of work items processed"); - CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, UINT64, "total execution time of processing work items"); + CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS, CACHETABLE_POOL_CLIENT_NUM_THREADS, UINT64, "client pool: number of threads in pool"); + CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CLIENT_NUM_THREADS_ACTIVE, UINT64, "client pool: number of currently active threads in pool"); + CT_STATUS_INIT(CT_POOL_CLIENT_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_QUEUE_SIZE, UINT64, "client pool: number of currently queued work items"); + CT_STATUS_INIT(CT_POOL_CLIENT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_MAX_QUEUE_SIZE, UINT64, "client pool: largest number of queued work items"); + CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, UINT64, "client pool: total number of work items processed"); + CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CLIENT_TOTAL_EXECUTION_TIME, UINT64, "client pool: total execution time of processing work items"); + CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS, CACHETABLE_POOL_CACHETABLE_NUM_THREADS, UINT64, "cachetable pool: number of threads in pool"); + CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CACHETABLE_NUM_THREADS_ACTIVE, UINT64, "cachetable pool: number of currently active threads in pool"); + CT_STATUS_INIT(CT_POOL_CACHETABLE_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_QUEUE_SIZE, UINT64, "cachetable pool: number of currently queued work items"); + CT_STATUS_INIT(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_MAX_QUEUE_SIZE, UINT64, "cachetable pool: largest number of queued work items"); + CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, UINT64, "cachetable pool: total number of work items processed"); + CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, UINT64, "cachetable pool: total execution time of processing work items"); + CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS, UINT64, "checkpoint pool: number of threads in pool"); + CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, UINT64, "checkpoint pool: number of currently active threads in pool"); + CT_STATUS_INIT(CT_POOL_CHECKPOINT_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_QUEUE_SIZE, UINT64, "checkpoint pool: number of currently queued work items"); + CT_STATUS_INIT(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_MAX_QUEUE_SIZE, UINT64, "checkpoint pool: largest number of queued work items"); + CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, UINT64, "checkpoint pool: total number of work items processed"); + CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, UINT64, "checkpoint pool: total execution time of processing work items"); m_initialized = true; #undef CT_STATUS_INIT diff --git a/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc b/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc index 7ca36c23780..6fcdbbdc9e3 100644 --- a/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc +++ b/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc @@ -172,21 +172,26 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_handle, BLOCKNUM blocknum, const assert(node->height==0); DBT kdbt, vdbt; - ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen), - FT_INSERT, next_dummymsn(), toku_xids_get_root_xids()); + ft_msg msg( + toku_fill_dbt(&kdbt, key, keylen), + toku_fill_dbt(&vdbt, val, vallen), + FT_INSERT, + next_dummymsn(), + toku_xids_get_root_xids()); static size_t zero_flow_deltas[] = { 0, 0 }; txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true); - toku_ftnode_put_msg(ft_handle->ft->cmp, - ft_handle->ft->update_fun, - node, - -1, - msg, - true, - &gc_info, - zero_flow_deltas, - NULL - ); + toku_ftnode_put_msg( + ft_handle->ft->cmp, + ft_handle->ft->update_fun, + node, + -1, + msg, + true, + &gc_info, + zero_flow_deltas, + NULL, + NULL); toku_verify_or_set_counts(node); diff --git a/storage/tokudb/PerconaFT/ft/ft.cc b/storage/tokudb/PerconaFT/ft/ft.cc index 2a0fb6f6800..93d21233bf7 100644 --- a/storage/tokudb/PerconaFT/ft/ft.cc +++ b/storage/tokudb/PerconaFT/ft/ft.cc @@ -198,6 +198,8 @@ static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) { ch->time_of_last_modification = now; ch->checkpoint_count++; ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft); + ch->on_disk_logical_rows = + ft->h->on_disk_logical_rows = ft->in_memory_logical_rows; // write translation and header to disk (or at least to OS internal buffer) toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf); @@ -383,7 +385,8 @@ ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that .count_of_optimize_in_progress = 0, .count_of_optimize_in_progress_read_from_disk = 0, .msn_at_start_of_last_completed_optimize = ZERO_MSN, - .on_disk_stats = ZEROSTATS + .on_disk_stats = ZEROSTATS, + .on_disk_logical_rows = 0 }; return (FT_HEADER) toku_xmemdup(&h, sizeof h); } @@ -802,7 +805,14 @@ toku_ft_stat64 (FT ft, struct ftstat64_s *s) { s->fsize = toku_cachefile_size(ft->cf); // just use the in memory stats from the header // prevent appearance of negative numbers for numrows, numbytes - int64_t n = ft->in_memory_stats.numrows; + // if the logical count was never properly re-counted on an upgrade, + // return the existing physical count instead. + int64_t n; + if (ft->in_memory_logical_rows == (uint64_t)-1) { + n = ft->in_memory_stats.numrows; + } else { + n = ft->in_memory_logical_rows; + } if (n < 0) { n = 0; } @@ -871,20 +881,38 @@ DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) { return &ft_handle->ft->cmp_descriptor; } -void -toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) { +void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) { (void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows); (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes); } -void -toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) { +void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) { (void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows); (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes); } -void -toku_ft_remove_reference(FT ft, bool oplsn_valid, LSN oplsn, remove_ft_ref_callback remove_ref, void *extra) { +void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) { + // In order to make sure that the correct count is returned from + // toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be + // modified from anywhere else from here with the exceptions of + // serializing in a header, initializing a new header and analyzing + // an index for a logical_row count. + // The gist is that on an index upgrade, all logical_rows values + // in the ft header are set to -1 until an analyze can reset it to an + // accurate value. Until then, the physical count from in_memory_stats + // must be returned in toku_ft_stat64. + if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) { + toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta); + } +} + +void toku_ft_remove_reference( + FT ft, + bool oplsn_valid, + LSN oplsn, + remove_ft_ref_callback remove_ref, + void *extra) { + toku_ft_grab_reflock(ft); if (toku_ft_has_one_reference_unlocked(ft)) { toku_ft_release_reflock(ft); diff --git a/storage/tokudb/PerconaFT/ft/ft.h b/storage/tokudb/PerconaFT/ft/ft.h index cc64bdfc6d3..d600e093bdc 100644 --- a/storage/tokudb/PerconaFT/ft/ft.h +++ b/storage/tokudb/PerconaFT/ft/ft.h @@ -127,13 +127,17 @@ DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle); typedef struct { // delta versions in basements could be negative + // These represent the physical leaf entries and do not account + // for pending deletes or other in-flight messages that have not been + // applied to a leaf entry. int64_t numrows; int64_t numbytes; } STAT64INFO_S, *STAT64INFO; -static const STAT64INFO_S ZEROSTATS = { .numrows = 0, .numbytes = 0}; +static const STAT64INFO_S ZEROSTATS = { .numrows = 0, .numbytes = 0 }; void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta); void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta); +void toku_ft_adjust_logical_row_count(FT ft, int64_t delta); typedef void (*remove_ft_ref_callback)(FT ft, void *extra); void toku_ft_remove_reference(FT ft, diff --git a/storage/tokudb/PerconaFT/ft/leafentry.h b/storage/tokudb/PerconaFT/ft/leafentry.h index 9cb81ef7cd6..7274a1480e2 100644 --- a/storage/tokudb/PerconaFT/ft/leafentry.h +++ b/storage/tokudb/PerconaFT/ft/leafentry.h @@ -180,43 +180,57 @@ uint64_t le_outermost_uncommitted_xid (LEAFENTRY le); // r|r!=0&&r!=TOKUDB_ACCEPT: Quit early, return r, because something unexpected went wrong (error case) typedef int(*LE_ITERATE_CALLBACK)(TXNID id, TOKUTXN context, bool is_provisional); -int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context); - -void le_extract_val(LEAFENTRY le, - // should we return the entire leafentry as the val? - bool is_leaf_mode, enum cursor_read_type read_type, - TOKUTXN ttxn, uint32_t *vallen, void **val); - -size_t -leafentry_disksize_13(LEAFENTRY_13 le); - -int -toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data. - void** keyp, - uint32_t* keylen, - size_t *new_leafentry_memorysize, - LEAFENTRY *new_leafentry_p); +int le_iterate_val( + LEAFENTRY le, + LE_ITERATE_CALLBACK f, + void** valpp, + uint32_t* vallenp, + TOKUTXN context); + +void le_extract_val( + LEAFENTRY le, + // should we return the entire leafentry as the val? + bool is_leaf_mode, + enum cursor_read_type read_type, + TOKUTXN ttxn, + uint32_t* vallen, + void** val); + +size_t leafentry_disksize_13(LEAFENTRY_13 le); + +int toku_le_upgrade_13_14( + // NULL if there was no stored data. + LEAFENTRY_13 old_leafentry, + void** keyp, + uint32_t* keylen, + size_t* new_leafentry_memorysize, + LEAFENTRY *new_leafentry_p); class bn_data; -void -toku_le_apply_msg(const ft_msg &msg, - LEAFENTRY old_leafentry, // NULL if there was no stored data. - bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data - uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced - uint32_t old_keylen, - txn_gc_info *gc_info, - LEAFENTRY *new_leafentry_p, - int64_t * numbytes_delta_p); - -bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info); - -void -toku_le_garbage_collect(LEAFENTRY old_leaf_entry, - bn_data* data_buffer, - uint32_t idx, - void* keyp, - uint32_t keylen, - txn_gc_info *gc_info, - LEAFENTRY *new_leaf_entry, - int64_t * numbytes_delta_p); +int64_t toku_le_apply_msg( + const ft_msg &msg, + // NULL if there was no stored data. + LEAFENTRY old_leafentry, + // bn_data storing leafentry, if NULL, means there is no bn_data + bn_data* data_buffer, + // index in data_buffer where leafentry is stored (and should be replaced + uint32_t idx, + uint32_t old_keylen, + txn_gc_info* gc_info, + LEAFENTRY *new_leafentry_p, + int64_t* numbytes_delta_p); + +bool toku_le_worth_running_garbage_collection( + LEAFENTRY le, + txn_gc_info* gc_info); + +void toku_le_garbage_collect( + LEAFENTRY old_leaf_entry, + bn_data* data_buffer, + uint32_t idx, + void* keyp, + uint32_t keylen, + txn_gc_info* gc_info, + LEAFENTRY* new_leaf_entry, + int64_t* numbytes_delta_p); diff --git a/storage/tokudb/PerconaFT/ft/loader/loader.cc b/storage/tokudb/PerconaFT/ft/loader/loader.cc index 5ff0d69af46..20f9363da1e 100644 --- a/storage/tokudb/PerconaFT/ft/loader/loader.cc +++ b/storage/tokudb/PerconaFT/ft/loader/loader.cc @@ -2312,11 +2312,42 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc) return lbuf; } -static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, FTLOADER bl, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method); -static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method); -static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size, STAT64INFO stats_to_update); -static int write_translation_table (struct dbout *out, long long *off_of_translation_p); -static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk); +static void finish_leafnode( + struct dbout* out, + struct leaf_buf* lbuf, + int progress_allocation, + FTLOADER bl, + uint32_t target_basementnodesize, + enum toku_compression_method target_compression_method); + +static int write_nonleaves( + FTLOADER bl, + FIDX pivots_fidx, + struct dbout* out, + struct subtrees_info* sts, + const DESCRIPTOR descriptor, + uint32_t target_nodesize, + uint32_t target_basementnodesize, + enum toku_compression_method target_compression_method); + +static void add_pair_to_leafnode( + struct leaf_buf* lbuf, + unsigned char* key, + int keylen, + unsigned char* val, + int vallen, + int this_leafentry_size, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta); + +static int write_translation_table( + struct dbout* out, + long long* off_of_translation_p); + +static int write_header( + struct dbout* out, + long long translation_location_on_disk, + long long translation_size_on_disk); static void drain_writer_q(QUEUE q) { void *item; @@ -2448,6 +2479,12 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, DBT maxkey = make_dbt(0, 0); // keep track of the max key of the current node STAT64INFO_S deltas = ZEROSTATS; + // This is just a placeholder and not used in the loader, the real/accurate + // stats will come out of 'deltas' because this loader is not pushing + // messages down into the top of a fractal tree where the logical row count + // is done, it is directly creating leaf entries so it must also take on + // performing the logical row counting on its own + int64_t logical_rows_delta = 0; while (result == 0) { void *item; { @@ -2506,7 +2543,15 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize); } - add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size, this_leafentry_size, &deltas); + add_pair_to_leafnode( + lbuf, + (unsigned char*)key.data, + key.size, + (unsigned char*)val.data, + val.size, + this_leafentry_size, + &deltas, + &logical_rows_delta); n_rows_remaining--; update_maxkey(&maxkey, &key); // set the new maxkey to the current key @@ -2526,6 +2571,13 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, toku_ft_update_stats(&ft.in_memory_stats, deltas); } + // As noted above, the loader directly creates a tree structure without + // going through the higher level ft API and tus bypasses the logical row + // counting performed at that level. So, we must manually update the logical + // row count with the info we have from the physical delta that comes out of + // add_pair_to_leafnode. + toku_ft_adjust_logical_row_count(&ft, deltas.numrows); + cleanup_maxkey(&maxkey); if (lbuf) { @@ -2878,7 +2930,16 @@ int toku_ft_loader_get_error(FTLOADER bl, int *error) { return 0; } -static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size, STAT64INFO stats_to_update) { +static void add_pair_to_leafnode( + struct leaf_buf* lbuf, + unsigned char* key, + int keylen, + unsigned char* val, + int vallen, + int this_leafentry_size, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta) { + lbuf->nkeys++; lbuf->ndata++; lbuf->dsize += keylen + vallen; @@ -2890,11 +2951,25 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int FTNODE leafnode = lbuf->node; uint32_t idx = BLB_DATA(leafnode, 0)->num_klpairs(); DBT kdbt, vdbt; - ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen), FT_INSERT, ZERO_MSN, lbuf->xids); + ft_msg msg( + toku_fill_dbt(&kdbt, key, keylen), + toku_fill_dbt(&vdbt, val, vallen), + FT_INSERT, + ZERO_MSN, + lbuf->xids); uint64_t workdone = 0; // there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true); - toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, &workdone, stats_to_update); + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + &workdone, + stats_to_update, + logical_rows_delta); } static int write_literal(struct dbout *out, void*data, size_t len) { @@ -2905,7 +2980,14 @@ static int write_literal(struct dbout *out, void*data, size_t len) { return result; } -static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, FTLOADER bl, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) { +static void finish_leafnode( + struct dbout* out, + struct leaf_buf* lbuf, + int progress_allocation, + FTLOADER bl, + uint32_t target_basementnodesize, + enum toku_compression_method target_compression_method) { + int result = 0; // serialize leaf to buffer @@ -2913,7 +2995,16 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr size_t uncompressed_serialized_leaf_size = 0; char *serialized_leaf = NULL; FTNODE_DISK_DATA ndd = NULL; - result = toku_serialize_ftnode_to_memory(lbuf->node, &ndd, target_basementnodesize, target_compression_method, true, true, &serialized_leaf_size, &uncompressed_serialized_leaf_size, &serialized_leaf); + result = toku_serialize_ftnode_to_memory( + lbuf->node, + &ndd, + target_basementnodesize, + target_compression_method, + true, + true, + &serialized_leaf_size, + &uncompressed_serialized_leaf_size, + &serialized_leaf); // write it out if (result == 0) { @@ -2979,8 +3070,11 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla return result; } -static int -write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk) { +static int write_header( + struct dbout* out, + long long translation_location_on_disk, + long long translation_size_on_disk) { + int result = 0; size_t size = toku_serialize_ft_size(out->ft->h); size_t alloced_size = roundup_to_multiple(512, size); @@ -2991,6 +3085,7 @@ write_header (struct dbout *out, long long translation_location_on_disk, long lo } else { wbuf_init(&wbuf, buf, size); out->ft->h->on_disk_stats = out->ft->in_memory_stats; + out->ft->h->on_disk_logical_rows = out->ft->in_memory_logical_rows; toku_serialize_ft_to_wbuf(&wbuf, out->ft->h, translation_location_on_disk, translation_size_on_disk); for (size_t i=size; i<alloced_size; i++) buf[i]=0; // initialize all those unused spots to zero if (wbuf.ndone != size) diff --git a/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc b/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc index 6dca8c25378..efaba49198d 100644 --- a/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc +++ b/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc @@ -265,8 +265,9 @@ toku_maybe_upgrade_log(const char *env_dir, const char *log_dir, LSN * lsn_of_cl TXNID last_xid = TXNID_NONE; r = verify_clean_shutdown_of_log_version(log_dir, version_of_logs_on_disk, &last_lsn, &last_xid); if (r != 0) { - if (TOKU_LOG_VERSION_25 <= version_of_logs_on_disk && version_of_logs_on_disk <= TOKU_LOG_VERSION_27 - && TOKU_LOG_VERSION_28 == TOKU_LOG_VERSION) { + if (version_of_logs_on_disk >= TOKU_LOG_VERSION_25 && + version_of_logs_on_disk <= TOKU_LOG_VERSION_29 && + TOKU_LOG_VERSION_29 == TOKU_LOG_VERSION) { r = 0; // can do recovery on dirty shutdown } else { fprintf(stderr, "Cannot upgrade PerconaFT version %d database.", version_of_logs_on_disk); diff --git a/storage/tokudb/PerconaFT/ft/logger/logger.h b/storage/tokudb/PerconaFT/ft/logger/logger.h index 1f15f59fb3f..d9595d71065 100644 --- a/storage/tokudb/PerconaFT/ft/logger/logger.h +++ b/storage/tokudb/PerconaFT/ft/logger/logger.h @@ -54,6 +54,7 @@ enum { TOKU_LOG_VERSION_26 = 26, // no change from 25 TOKU_LOG_VERSION_27 = 27, // no change from 26 TOKU_LOG_VERSION_28 = 28, // no change from 27 + TOKU_LOG_VERSION_29 = 29, // no change from 28 TOKU_LOG_VERSION = FT_LAYOUT_VERSION, TOKU_LOG_MIN_SUPPORTED_VERSION = FT_LAYOUT_MIN_SUPPORTED_VERSION, }; diff --git a/storage/tokudb/PerconaFT/ft/logger/recover.h b/storage/tokudb/PerconaFT/ft/logger/recover.h index 0d216c11a8b..bdd44d562cd 100644 --- a/storage/tokudb/PerconaFT/ft/logger/recover.h +++ b/storage/tokudb/PerconaFT/ft/logger/recover.h @@ -67,7 +67,7 @@ int tokuft_recover(DB_ENV *env, // Effect: Check the tokuft logs to determine whether or not we need to run recovery. // If the log is empty or if there is a clean shutdown at the end of the log, then we -// dont need to run recovery. +// don't need to run recovery. // Returns: true if we need recovery, otherwise false. int tokuft_needs_recovery(const char *logdir, bool ignore_empty_log); diff --git a/storage/tokudb/PerconaFT/ft/node.cc b/storage/tokudb/PerconaFT/ft/node.cc index 44dbc73ba2b..58ba675eb7c 100644 --- a/storage/tokudb/PerconaFT/ft/node.cc +++ b/storage/tokudb/PerconaFT/ft/node.cc @@ -206,12 +206,20 @@ int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, con } /** - * Given a message buffer and and offset, apply the message with toku_ft_bn_apply_msg, or discard it, + * Given a message buffer and and offset, apply the message with + * toku_ft_bn_apply_msg, or discard it, * based on its MSN and the MSN of the basement node. */ -static void -do_bn_apply_msg(FT_HANDLE ft_handle, BASEMENTNODE bn, message_buffer *msg_buffer, int32_t offset, - txn_gc_info *gc_info, uint64_t *workdone, STAT64INFO stats_to_update) { +static void do_bn_apply_msg( + FT_HANDLE ft_handle, + BASEMENTNODE bn, + message_buffer* msg_buffer, + int32_t offset, + txn_gc_info* gc_info, + uint64_t* workdone, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta) { + DBT k, v; ft_msg msg = msg_buffer->get_message(offset, &k, &v); @@ -227,16 +235,17 @@ do_bn_apply_msg(FT_HANDLE ft_handle, BASEMENTNODE bn, message_buffer *msg_buffer msg, gc_info, workdone, - stats_to_update - ); + stats_to_update, + logical_rows_delta); } else { toku_ft_status_note_msn_discard(); } // We must always mark message as stale since it has been marked // (using omt::iterate_and_mark_range) - // It is possible to call do_bn_apply_msg even when it won't apply the message because - // the node containing it could have been evicted and brought back in. + // It is possible to call do_bn_apply_msg even when it won't apply the + // message because the node containing it could have been evicted and + // brought back in. msg_buffer->set_freshness(offset, false); } @@ -248,12 +257,29 @@ struct iterate_do_bn_apply_msg_extra { txn_gc_info *gc_info; uint64_t *workdone; STAT64INFO stats_to_update; + int64_t *logical_rows_delta; }; -int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e) __attribute__((nonnull(3))); -int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e) +int iterate_do_bn_apply_msg( + const int32_t &offset, + const uint32_t UU(idx), + struct iterate_do_bn_apply_msg_extra* const e) + __attribute__((nonnull(3))); + +int iterate_do_bn_apply_msg( + const int32_t &offset, + const uint32_t UU(idx), + struct iterate_do_bn_apply_msg_extra* const e) { - do_bn_apply_msg(e->t, e->bn, &e->bnc->msg_buffer, offset, e->gc_info, e->workdone, e->stats_to_update); + do_bn_apply_msg( + e->t, + e->bn, + &e->bnc->msg_buffer, + offset, + e->gc_info, + e->workdone, + e->stats_to_update, + e->logical_rows_delta); return 0; } @@ -354,17 +380,15 @@ find_bounds_within_message_tree( * or plus infinity respectively if they are NULL. Do not mark the node * as dirty (preserve previous state of 'dirty' bit). */ -static void -bnc_apply_messages_to_basement_node( +static void bnc_apply_messages_to_basement_node( FT_HANDLE t, // used for comparison function BASEMENTNODE bn, // where to apply messages FTNODE ancestor, // the ancestor node where we can find messages to apply int childnum, // which child buffer of ancestor contains messages we want const pivot_bounds &bounds, // contains pivot key bounds of this basement node - txn_gc_info *gc_info, - bool* msgs_applied - ) -{ + txn_gc_info* gc_info, + bool* msgs_applied) { + int r; NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum); @@ -372,16 +396,29 @@ bnc_apply_messages_to_basement_node( // apply messages from this buffer STAT64INFO_S stats_delta = {0,0}; uint64_t workdone_this_ancestor = 0; + int64_t logical_rows_delta = 0; uint32_t stale_lbi, stale_ube; if (!bn->stale_ancestor_messages_applied) { - find_bounds_within_message_tree(t->ft->cmp, bnc->stale_message_tree, &bnc->msg_buffer, bounds, &stale_lbi, &stale_ube); + find_bounds_within_message_tree( + t->ft->cmp, + bnc->stale_message_tree, + &bnc->msg_buffer, + bounds, + &stale_lbi, + &stale_ube); } else { stale_lbi = 0; stale_ube = 0; } uint32_t fresh_lbi, fresh_ube; - find_bounds_within_message_tree(t->ft->cmp, bnc->fresh_message_tree, &bnc->msg_buffer, bounds, &fresh_lbi, &fresh_ube); + find_bounds_within_message_tree( + t->ft->cmp, + bnc->fresh_message_tree, + &bnc->msg_buffer, + bounds, + &fresh_lbi, + &fresh_ube); // We now know where all the messages we must apply are, so one of the // following 4 cases will do the application, depending on which of @@ -395,7 +432,9 @@ bnc_apply_messages_to_basement_node( // We have messages in multiple trees, so we grab all // the relevant messages' offsets and sort them by MSN, then apply // them in MSN order. - const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + bnc->broadcast_list.size()); + const int buffer_size = ((stale_ube - stale_lbi) + + (fresh_ube - fresh_lbi) + + bnc->broadcast_list.size()); toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t)); int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get()); struct store_msg_buffer_offset_extra sfo_extra = { .offsets = offsets, .i = 0 }; @@ -419,11 +458,27 @@ bnc_apply_messages_to_basement_node( // Apply the messages in MSN order. for (int i = 0; i < buffer_size; ++i) { *msgs_applied = true; - do_bn_apply_msg(t, bn, &bnc->msg_buffer, offsets[i], gc_info, &workdone_this_ancestor, &stats_delta); + do_bn_apply_msg( + t, + bn, + &bnc->msg_buffer, + offsets[i], + gc_info, + &workdone_this_ancestor, + &stats_delta, + &logical_rows_delta); } } else if (stale_lbi == stale_ube) { // No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later. - struct iterate_do_bn_apply_msg_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .gc_info = gc_info, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta }; + struct iterate_do_bn_apply_msg_extra iter_extra = { + .t = t, + .bn = bn, + .bnc = bnc, + .gc_info = gc_info, + .workdone = &workdone_this_ancestor, + .stats_to_update = &stats_delta, + .logical_rows_delta = &logical_rows_delta + }; if (fresh_ube - fresh_lbi > 0) *msgs_applied = true; r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra, iterate_do_bn_apply_msg>(fresh_lbi, fresh_ube, &iter_extra); assert_zero(r); @@ -432,7 +487,15 @@ bnc_apply_messages_to_basement_node( // No fresh messages to apply, we just apply stale messages. if (stale_ube - stale_lbi > 0) *msgs_applied = true; - struct iterate_do_bn_apply_msg_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .gc_info = gc_info, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta }; + struct iterate_do_bn_apply_msg_extra iter_extra = { + .t = t, + .bn = bn, + .bnc = bnc, + .gc_info = gc_info, + .workdone = &workdone_this_ancestor, + .stats_to_update = &stats_delta, + .logical_rows_delta = &logical_rows_delta + }; r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_msg_extra, iterate_do_bn_apply_msg>(stale_lbi, stale_ube, &iter_extra); assert_zero(r); @@ -446,6 +509,7 @@ bnc_apply_messages_to_basement_node( if (stats_delta.numbytes || stats_delta.numrows) { toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta); } + toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta); } static void @@ -1073,7 +1137,8 @@ toku_ft_bn_apply_msg_once ( LEAFENTRY le, txn_gc_info *gc_info, uint64_t *workdone, - STAT64INFO stats_to_update + STAT64INFO stats_to_update, + int64_t *logical_rows_delta ) // Effect: Apply msg to leafentry (msn is ignored) // Calculate work done by message on leafentry and add it to caller's workdone counter. @@ -1082,26 +1147,34 @@ toku_ft_bn_apply_msg_once ( { size_t newsize=0, oldsize=0, workdone_this_le=0; LEAFENTRY new_le=0; - int64_t numbytes_delta = 0; // how many bytes of user data (not including overhead) were added or deleted from this row - int64_t numrows_delta = 0; // will be +1 or -1 or 0 (if row was added or deleted or not) + // how many bytes of user data (not including overhead) were added or + // deleted from this row + int64_t numbytes_delta = 0; + // will be +1 or -1 or 0 (if row was added or deleted or not) + int64_t numrows_delta = 0; + // will be +1, -1 or 0 if a message that was accounted for logically has + // changed in meaning such as an insert changed to an update or a delete + // changed to a noop + int64_t logical_rows_delta_le = 0; uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t); if (le) { oldsize = leafentry_memsize(le) + key_storage_size; } - // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt() to allocate more space. - // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is - // no longer in use. We'll have to release the old mempool later. - toku_le_apply_msg( - msg, + // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt() + // to allocate more space. That means le is guaranteed to not cause a + // sigsegv but it may point to a mempool that is no longer in use. + // We'll have to release the old mempool later. + logical_rows_delta_le = toku_le_apply_msg( + msg, le, &bn->data_buffer, idx, le_keylen, - gc_info, - &new_le, - &numbytes_delta - ); + gc_info, + &new_le, + &numbytes_delta); + // at this point, we cannot trust cmd->u.id.key to be valid. // The dmt may have realloced its mempool and freed the one containing key. @@ -1121,37 +1194,42 @@ toku_ft_bn_apply_msg_once ( numrows_delta = 1; } } - if (workdone) { // test programs may call with NULL + if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL *workdone += workdone_this_le; } + if (FT_LIKELY(logical_rows_delta != NULL)) { + *logical_rows_delta += logical_rows_delta_le; + } // now update stat64 statistics bn->stat64_delta.numrows += numrows_delta; bn->stat64_delta.numbytes += numbytes_delta; // the only reason stats_to_update may be null is for tests - if (stats_to_update) { + if (FT_LIKELY(stats_to_update != NULL)) { stats_to_update->numrows += numrows_delta; stats_to_update->numbytes += numbytes_delta; } - } static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in. struct setval_extra_s { uint32_t tag; bool did_set_val; - int setval_r; // any error code that setval_fun wants to return goes here. + // any error code that setval_fun wants to return goes here. + int setval_r; // need arguments for toku_ft_bn_apply_msg_once BASEMENTNODE bn; - MSN msn; // captured from original message, not currently used + // captured from original message, not currently used + MSN msn; XIDS xids; - const DBT *key; + const DBT* key; uint32_t idx; uint32_t le_keylen; LEAFENTRY le; - txn_gc_info *gc_info; - uint64_t * workdone; // set by toku_ft_bn_apply_msg_once() + txn_gc_info* gc_info; + uint64_t* workdone; // set by toku_ft_bn_apply_msg_once() STAT64INFO stats_to_update; + int64_t* logical_rows_delta; }; /* @@ -1170,29 +1248,45 @@ static void setval_fun (const DBT *new_val, void *svextra_v) { // can't leave scope until toku_ft_bn_apply_msg_once if // this is a delete DBT val; - ft_msg msg(svextra->key, - new_val ? new_val : toku_init_dbt(&val), - new_val ? FT_INSERT : FT_DELETE_ANY, - svextra->msn, svextra->xids); - toku_ft_bn_apply_msg_once(svextra->bn, msg, - svextra->idx, svextra->le_keylen, svextra->le, - svextra->gc_info, - svextra->workdone, svextra->stats_to_update); + ft_msg msg( + svextra->key, + new_val ? new_val : toku_init_dbt(&val), + new_val ? FT_INSERT : FT_DELETE_ANY, + svextra->msn, + svextra->xids); + toku_ft_bn_apply_msg_once( + svextra->bn, + msg, + svextra->idx, + svextra->le_keylen, + svextra->le, + svextra->gc_info, + svextra->workdone, + svextra->stats_to_update, + svextra->logical_rows_delta); svextra->setval_r = 0; } } -// We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls do_update()), -// so capturing the msn in the setval_extra_s is not strictly required. The alternative -// would be to put a dummy msn in the messages created by setval_fun(), but preserving -// the original msn seems cleaner and it preserves accountability at a lower layer. -static int do_update(ft_update_func update_fun, const DESCRIPTOR_S *desc, BASEMENTNODE bn, const ft_msg &msg, uint32_t idx, - LEAFENTRY le, - void* keydata, - uint32_t keylen, - txn_gc_info *gc_info, - uint64_t * workdone, - STAT64INFO stats_to_update) { +// We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls +// do_update()), so capturing the msn in the setval_extra_s is not strictly +// required. The alternative would be to put a dummy msn in the messages +// created by setval_fun(), but preserving the original msn seems cleaner and +// it preserves accountability at a lower layer. +static int do_update( + ft_update_func update_fun, + const DESCRIPTOR_S* desc, + BASEMENTNODE bn, + const ft_msg &msg, + uint32_t idx, + LEAFENTRY le, + void* keydata, + uint32_t keylen, + txn_gc_info* gc_info, + uint64_t* workdone, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta) { + LEAFENTRY le_for_update; DBT key; const DBT *keyp; @@ -1232,39 +1326,52 @@ static int do_update(ft_update_func update_fun, const DESCRIPTOR_S *desc, BASEME } le_for_update = le; - struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, msg.msn(), msg.xids(), - keyp, idx, keylen, le_for_update, gc_info, - workdone, stats_to_update}; - // call handlerton's ft->update_fun(), which passes setval_extra to setval_fun() + struct setval_extra_s setval_extra = { + setval_tag, + false, + 0, + bn, + msg.msn(), + msg.xids(), + keyp, + idx, + keylen, + le_for_update, + gc_info, + workdone, + stats_to_update, + logical_rows_delta + }; + // call handlerton's ft->update_fun(), which passes setval_extra + // to setval_fun() FAKE_DB(db, desc); int r = update_fun( &db, keyp, vdbtp, update_function_extra, - setval_fun, &setval_extra - ); + setval_fun, + &setval_extra); if (r == 0) { r = setval_extra.setval_r; } return r; } // Should be renamed as something like "apply_msg_to_basement()." -void -toku_ft_bn_apply_msg ( - const toku::comparator &cmp, +void toku_ft_bn_apply_msg( + const toku::comparator& cmp, ft_update_func update_fun, BASEMENTNODE bn, - const ft_msg &msg, - txn_gc_info *gc_info, - uint64_t *workdone, - STAT64INFO stats_to_update - ) + const ft_msg& msg, + txn_gc_info* gc_info, + uint64_t* workdone, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta) { // Effect: // Put a msg into a leaf. -// Calculate work done by message on leafnode and add it to caller's workdone counter. +// Calculate work done by message on leafnode and add it to caller's +// workdone counter. // The leaf could end up "too big" or "too small". The caller must fix that up. -{ LEAFENTRY storeddata; void* key = NULL; uint32_t keylen = 0; @@ -1303,7 +1410,16 @@ toku_ft_bn_apply_msg ( } else { assert_zero(r); } - toku_ft_bn_apply_msg_once(bn, msg, idx, keylen, storeddata, gc_info, workdone, stats_to_update); + toku_ft_bn_apply_msg_once( + bn, + msg, + idx, + keylen, + storeddata, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); // if the insertion point is within a window of the right edge of // the leaf then it is sequential @@ -1331,12 +1447,19 @@ toku_ft_bn_apply_msg ( &storeddata, &key, &keylen, - &idx - ); + &idx); if (r == DB_NOTFOUND) break; assert_zero(r); - toku_ft_bn_apply_msg_once(bn, msg, idx, keylen, storeddata, gc_info, workdone, stats_to_update); - + toku_ft_bn_apply_msg_once( + bn, + msg, + idx, + keylen, + storeddata, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); break; } case FT_OPTIMIZE_FOR_UPGRADE: @@ -1352,13 +1475,27 @@ toku_ft_bn_apply_msg ( assert_zero(r); int deleted = 0; if (!le_is_clean(storeddata)) { //If already clean, nothing to do. - // message application code needs a key in order to determine how much - // work was done by this message. since this is a broadcast message, - // we have to create a new message whose key is the current le's key. + // message application code needs a key in order to determine + // how much work was done by this message. since this is a + // broadcast message, we have to create a new message whose + // key is the current le's key. DBT curr_keydbt; - ft_msg curr_msg(toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), - msg.vdbt(), msg.type(), msg.msn(), msg.xids()); - toku_ft_bn_apply_msg_once(bn, curr_msg, idx, curr_keylen, storeddata, gc_info, workdone, stats_to_update); + ft_msg curr_msg( + toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), + msg.vdbt(), + msg.type(), + msg.msn(), + msg.xids()); + toku_ft_bn_apply_msg_once( + bn, + curr_msg, + idx, + curr_keylen, + storeddata, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); // at this point, we cannot trust msg.kdbt to be valid. uint32_t new_dmt_size = bn->data_buffer.num_klpairs(); if (new_dmt_size != num_klpairs) { @@ -1386,13 +1523,27 @@ toku_ft_bn_apply_msg ( assert_zero(r); int deleted = 0; if (le_has_xids(storeddata, msg.xids())) { - // message application code needs a key in order to determine how much - // work was done by this message. since this is a broadcast message, - // we have to create a new message whose key is the current le's key. + // message application code needs a key in order to determine + // how much work was done by this message. since this is a + // broadcast message, we have to create a new message whose key + // is the current le's key. DBT curr_keydbt; - ft_msg curr_msg(toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), - msg.vdbt(), msg.type(), msg.msn(), msg.xids()); - toku_ft_bn_apply_msg_once(bn, curr_msg, idx, curr_keylen, storeddata, gc_info, workdone, stats_to_update); + ft_msg curr_msg( + toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen), + msg.vdbt(), + msg.type(), + msg.msn(), + msg.xids()); + toku_ft_bn_apply_msg_once( + bn, + curr_msg, + idx, + curr_keylen, + storeddata, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); uint32_t new_dmt_size = bn->data_buffer.num_klpairs(); if (new_dmt_size != num_klpairs) { paranoid_invariant(new_dmt_size + 1 == num_klpairs); @@ -1424,9 +1575,33 @@ toku_ft_bn_apply_msg ( key = msg.kdbt()->data; keylen = msg.kdbt()->size; } - r = do_update(update_fun, cmp.get_descriptor(), bn, msg, idx, NULL, NULL, 0, gc_info, workdone, stats_to_update); + r = do_update( + update_fun, + cmp.get_descriptor(), + bn, + msg, + idx, + NULL, + NULL, + 0, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); } else if (r==0) { - r = do_update(update_fun, cmp.get_descriptor(), bn, msg, idx, storeddata, key, keylen, gc_info, workdone, stats_to_update); + r = do_update( + update_fun, + cmp.get_descriptor(), + bn, + msg, + idx, + storeddata, + key, + keylen, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); } // otherwise, a worse error, just return it break; } @@ -1434,6 +1609,12 @@ toku_ft_bn_apply_msg ( // apply to all leafentries. uint32_t idx = 0; uint32_t num_leafentries_before; + // This is used to avoid having the logical row count changed on apply + // of this message since it will return a negative number of the number + // of leaf entries visited and cause the ft header value to go to 0; + // This message will not change the number of rows, so just use the + // bogus value. + int64_t temp_logical_rows_delta = 0; while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) { void* curr_key = nullptr; uint32_t curr_keylen = 0; @@ -1449,7 +1630,19 @@ toku_ft_bn_apply_msg ( // This is broken below. Have a compilation error checked // in as a reminder - r = do_update(update_fun, cmp.get_descriptor(), bn, msg, idx, storeddata, curr_key, curr_keylen, gc_info, workdone, stats_to_update); + r = do_update( + update_fun, + cmp.get_descriptor(), + bn, + msg, + idx, + storeddata, + curr_key, + curr_keylen, + gc_info, + workdone, + stats_to_update, + &temp_logical_rows_delta); assert_zero(r); if (num_leafentries_before == bn->data_buffer.num_klpairs()) { @@ -1810,24 +2003,22 @@ void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) { } } -void -toku_ftnode_put_msg ( +void toku_ftnode_put_msg( const toku::comparator &cmp, ft_update_func update_fun, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, - txn_gc_info *gc_info, + txn_gc_info* gc_info, size_t flow_deltas[], - STAT64INFO stats_to_update - ) + STAT64INFO stats_to_update, + int64_t* logical_rows_delta) { // Effect: Push message into the subtree rooted at NODE. // If NODE is a leaf, then // put message into leaf, applying it to the leafentries // If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren). // The node may become overfull. That's not our problem. -{ toku_ftnode_assert_fully_in_memory(node); // // see comments in toku_ft_leaf_apply_msg @@ -1836,26 +2027,40 @@ toku_ftnode_put_msg ( // and instead defer to these functions // if (node->height==0) { - toku_ft_leaf_apply_msg(cmp, update_fun, node, target_childnum, msg, gc_info, nullptr, stats_to_update); + toku_ft_leaf_apply_msg( + cmp, + update_fun, + node, + target_childnum, msg, + gc_info, + nullptr, + stats_to_update, + logical_rows_delta); } else { - ft_nonleaf_put_msg(cmp, node, target_childnum, msg, is_fresh, flow_deltas); + ft_nonleaf_put_msg( + cmp, + node, + target_childnum, + msg, + is_fresh, + flow_deltas); } } -// Effect: applies the message to the leaf if the appropriate basement node is in memory. -// This function is called during message injection and/or flushing, so the entire -// node MUST be in memory. +// Effect: applies the message to the leaf if the appropriate basement node is +// in memory. This function is called during message injection and/or +// flushing, so the entire node MUST be in memory. void toku_ft_leaf_apply_msg( - const toku::comparator &cmp, + const toku::comparator& cmp, ft_update_func update_fun, FTNODE node, int target_childnum, // which child to inject to, or -1 if unknown - const ft_msg &msg, - txn_gc_info *gc_info, - uint64_t *workdone, - STAT64INFO stats_to_update - ) -{ + const ft_msg& msg, + txn_gc_info* gc_info, + uint64_t* workdone, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta) { + VERIFY_NODE(t, node); toku_ftnode_assert_fully_in_memory(node); @@ -1891,34 +2096,36 @@ void toku_ft_leaf_apply_msg( BASEMENTNODE bn = BLB(node, childnum); if (msg.msn().msn > bn->max_msn_applied.msn) { bn->max_msn_applied = msg.msn(); - toku_ft_bn_apply_msg(cmp, - update_fun, - bn, - msg, - gc_info, - workdone, - stats_to_update); + toku_ft_bn_apply_msg( + cmp, + update_fun, + bn, + msg, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); } else { toku_ft_status_note_msn_discard(); } - } - else if (ft_msg_type_applies_all(msg.type())) { + } else if (ft_msg_type_applies_all(msg.type())) { for (int childnum=0; childnum<node->n_children; childnum++) { if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) { BLB(node, childnum)->max_msn_applied = msg.msn(); - toku_ft_bn_apply_msg(cmp, - update_fun, - BLB(node, childnum), - msg, - gc_info, - workdone, - stats_to_update); + toku_ft_bn_apply_msg( + cmp, + update_fun, + BLB(node, childnum), + msg, + gc_info, + workdone, + stats_to_update, + logical_rows_delta); } else { toku_ft_status_note_msn_discard(); } } - } - else if (!ft_msg_type_does_nothing(msg.type())) { + } else if (!ft_msg_type_does_nothing(msg.type())) { invariant(ft_msg_type_does_nothing(msg.type())); } VERIFY_NODE(t, node); diff --git a/storage/tokudb/PerconaFT/ft/node.h b/storage/tokudb/PerconaFT/ft/node.h index 9d910491682..ad0298e81c5 100644 --- a/storage/tokudb/PerconaFT/ft/node.h +++ b/storage/tokudb/PerconaFT/ft/node.h @@ -382,25 +382,54 @@ enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize); * If k is equal to some pivot, then we return the next (to the right) * childnum. */ -int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp); - -void toku_ftnode_put_msg(const toku::comparator &cmp, ft_update_func update_fun, - FTNODE node, int target_childnum, - const ft_msg &msg, bool is_fresh, txn_gc_info *gc_info, - size_t flow_deltas[], STAT64INFO stats_to_update); - -void toku_ft_bn_apply_msg_once(BASEMENTNODE bn, const ft_msg &msg, uint32_t idx, - uint32_t le_keylen, LEAFENTRY le, txn_gc_info *gc_info, - uint64_t *workdonep, STAT64INFO stats_to_update); - -void toku_ft_bn_apply_msg(const toku::comparator &cmp, ft_update_func update_fun, - BASEMENTNODE bn, const ft_msg &msg, txn_gc_info *gc_info, - uint64_t *workdone, STAT64INFO stats_to_update); - -void toku_ft_leaf_apply_msg(const toku::comparator &cmp, ft_update_func update_fun, - FTNODE node, int target_childnum, - const ft_msg &msg, txn_gc_info *gc_info, - uint64_t *workdone, STAT64INFO stats_to_update); +int toku_ftnode_hot_next_child( + FTNODE node, + const DBT* k, + const toku::comparator &cmp); + +void toku_ftnode_put_msg( + const toku::comparator& cmp, + ft_update_func update_fun, + FTNODE node, + int target_childnum, + const ft_msg& msg, + bool is_fresh, + txn_gc_info* gc_info, + size_t flow_deltas[], + STAT64INFO stats_to_update, + int64_t* logical_rows_delta); + +void toku_ft_bn_apply_msg_once( + BASEMENTNODE bn, + const ft_msg& msg, + uint32_t idx, + uint32_t le_keylen, + LEAFENTRY le, + txn_gc_info* gc_info, + uint64_t* workdonep, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta); + +void toku_ft_bn_apply_msg( + const toku::comparator& cmp, + ft_update_func update_fun, + BASEMENTNODE bn, + const ft_msg& msg, + txn_gc_info* gc_info, + uint64_t* workdone, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta); + +void toku_ft_leaf_apply_msg( + const toku::comparator& cmp, + ft_update_func update_fun, + FTNODE node, + int target_childnum, + const ft_msg& msg, + txn_gc_info* gc_info, + uint64_t* workdone, + STAT64INFO stats_to_update, + int64_t* logical_rows_delta); // // Message management for orthopush diff --git a/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc b/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc index a7bc2949276..49d4368a3ab 100644 --- a/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc +++ b/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc @@ -323,6 +323,13 @@ int deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) fanout = rbuf_int(rb); } + uint64_t on_disk_logical_rows; + on_disk_logical_rows = (uint64_t)-1; + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_29) { + on_disk_logical_rows = rbuf_ulonglong(rb); + } + ft->in_memory_logical_rows = on_disk_logical_rows; + (void) rbuf_int(rb); //Read in checksum and ignore (already verified). if (rb->ndone != rb->size) { fprintf(stderr, "Header size did not match contents.\n"); @@ -357,7 +364,8 @@ int deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) .count_of_optimize_in_progress = count_of_optimize_in_progress, .count_of_optimize_in_progress_read_from_disk = count_of_optimize_in_progress, .msn_at_start_of_last_completed_optimize = msn_at_start_of_last_completed_optimize, - .on_disk_stats = on_disk_stats + .on_disk_stats = on_disk_stats, + .on_disk_logical_rows = on_disk_logical_rows }; XMEMDUP(ft->h, &h); } @@ -408,6 +416,8 @@ serialize_ft_min_size (uint32_t version) { size_t size = 0; switch(version) { + case FT_LAYOUT_VERSION_29: + size += sizeof(uint64_t); // logrows in ft case FT_LAYOUT_VERSION_28: size += sizeof(uint32_t); // fanout in ft case FT_LAYOUT_VERSION_27: @@ -754,6 +764,7 @@ void toku_serialize_ft_to_wbuf ( wbuf_MSN(wbuf, h->highest_unused_msn_for_upgrade); wbuf_MSN(wbuf, h->max_msn_in_ft); wbuf_int(wbuf, h->fanout); + wbuf_ulonglong(wbuf, h->on_disk_logical_rows); uint32_t checksum = toku_x1764_finish(&wbuf->checksum); wbuf_int(wbuf, checksum); lazy_assert(wbuf->ndone == wbuf->size); diff --git a/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h b/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h index 72b6882bc06..9407a568337 100644 --- a/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h +++ b/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h @@ -68,6 +68,7 @@ enum ft_layout_version_e { FT_LAYOUT_VERSION_26 = 26, // Hojo: basements store key/vals separately on disk for fixed klpair length BNs FT_LAYOUT_VERSION_27 = 27, // serialize message trees with nonleaf buffers to avoid key, msn sort on deserialize FT_LAYOUT_VERSION_28 = 28, // Add fanout to ft_header + FT_LAYOUT_VERSION_29 = 29, // Add logrows to ft_header FT_NEXT_VERSION, // the version after the current version FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line. FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported diff --git a/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt b/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt index 0098b6091be..270ec97660a 100644 --- a/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt +++ b/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt @@ -112,11 +112,13 @@ if(BUILD_TESTING OR BUILD_FT_TESTS) declare_custom_tests(test-upgrade-recovery-logs) file(GLOB upgrade_tests "${TOKUDB_DATA}/upgrade-recovery-logs-??-clean") + file(GLOB upgrade_tests "${CMAKE_CURRENT_SOURCE_DIR}/upgrade.data/upgrade-recovery-logs-??-clean") foreach(test ${upgrade_tests}) get_filename_component(test_basename "${test}" NAME) add_ft_test_aux(test-${test_basename} test-upgrade-recovery-logs ${test}) endforeach(test) file(GLOB upgrade_tests "${TOKUDB_DATA}/upgrade-recovery-logs-??-dirty") + file(GLOB upgrade_tests "${CMAKE_CURRENT_SOURCE_DIR}/upgrade.data/upgrade-recovery-logs-??-dirty") foreach(test ${upgrade_tests}) get_filename_component(test_basename "${test}" NAME) add_ft_test_aux(test-${test_basename} test-upgrade-recovery-logs ${test}) diff --git a/storage/tokudb/PerconaFT/ft/tests/make-tree.cc b/storage/tokudb/PerconaFT/ft/tests/make-tree.cc index 663bbf3beb2..761d672539b 100644 --- a/storage/tokudb/PerconaFT/ft/tests/make-tree.cc +++ b/storage/tokudb/PerconaFT/ft/tests/make-tree.cc @@ -74,11 +74,20 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) // apply an insert to the leaf node txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); - toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); leafnode->max_msn_applied_to_node_on_disk = msn; - // dont forget to dirty the node + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc b/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc index 737c3556ad6..c37dcd089f8 100644 --- a/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc +++ b/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc @@ -82,49 +82,85 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg, &gc_info, nullptr, nullptr); + toku_ft_leaf_apply_msg( + ft->ft->cmp, + ft->ft->update_fun, + leafnode, + -1, + msg, + &gc_info, + nullptr, + nullptr, + nullptr); { - int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair); - assert(r==0); - assert(pair.call_count==1); + int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair); + assert(r==0); + assert(pair.call_count==1); } ft_msg badmsg(&thekey, &badval, FT_INSERT, msn, toku_xids_get_root_xids()); - toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, badmsg, &gc_info, nullptr, nullptr); + toku_ft_leaf_apply_msg( + ft->ft->cmp, + ft->ft->update_fun, + leafnode, + -1, + badmsg, + &gc_info, + nullptr, + nullptr, + nullptr); // message should be rejected for duplicate msn, row should still have original val { - int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair); - assert(r==0); - assert(pair.call_count==2); + int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair); + assert(r==0); + assert(pair.call_count==2); } // now verify that message with proper msn gets through msn = next_dummymsn(); ft->ft->h->max_msn_in_ft = msn; ft_msg msg2(&thekey, &val2, FT_INSERT, msn, toku_xids_get_root_xids()); - toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg2, &gc_info, nullptr, nullptr); + toku_ft_leaf_apply_msg( + ft->ft->cmp, + ft->ft->update_fun, + leafnode, + -1, + msg2, + &gc_info, + nullptr, + nullptr, + nullptr); // message should be accepted, val should have new value { - int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2); - assert(r==0); - assert(pair2.call_count==1); + int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2); + assert(r==0); + assert(pair2.call_count==1); } // now verify that message with lesser (older) msn is rejected msn.msn = msn.msn - 10; ft_msg msg3(&thekey, &badval, FT_INSERT, msn, toku_xids_get_root_xids()); - toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg3, &gc_info, nullptr, nullptr); + toku_ft_leaf_apply_msg( + ft->ft->cmp, + ft->ft->update_fun, + leafnode, + -1, + msg3, + &gc_info, + nullptr, + nullptr, + nullptr); // message should be rejected, val should still have value in pair2 { - int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2); - assert(r==0); - assert(pair2.call_count==2); + int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2); + assert(r==0); + assert(pair2.call_count==2); } - // dont forget to dirty the node + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc b/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc index 055a38e5f6d..393fb88ac2e 100644 --- a/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc +++ b/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc @@ -137,8 +137,24 @@ insert_random_message_to_bn( *keyp = toku_xmemdup(keydbt->data, keydbt->size); ft_msg msg(keydbt, valdbt, FT_INSERT, msn, xids); int64_t numbytes; - toku_le_apply_msg(msg, NULL, NULL, 0, keydbt->size, &non_mvcc_gc_info, save, &numbytes); - toku_ft_bn_apply_msg(t->ft->cmp, t->ft->update_fun, blb, msg, &non_mvcc_gc_info, NULL, NULL); + toku_le_apply_msg( + msg, + NULL, + NULL, + 0, + keydbt->size, + &non_mvcc_gc_info, + save, + &numbytes); + toku_ft_bn_apply_msg( + t->ft->cmp, + t->ft->update_fun, + blb, + msg, + &non_mvcc_gc_info, + NULL, + NULL, + NULL); if (msn.msn > blb->max_msn_applied.msn) { blb->max_msn_applied = msn; } @@ -182,12 +198,36 @@ insert_same_message_to_bns( *keyp = toku_xmemdup(keydbt->data, keydbt->size); ft_msg msg(keydbt, valdbt, FT_INSERT, msn, xids); int64_t numbytes; - toku_le_apply_msg(msg, NULL, NULL, 0, keydbt->size, &non_mvcc_gc_info, save, &numbytes); - toku_ft_bn_apply_msg(t->ft->cmp, t->ft->update_fun, blb1, msg, &non_mvcc_gc_info, NULL, NULL); + toku_le_apply_msg( + msg, + NULL, + NULL, + 0, + keydbt->size, + &non_mvcc_gc_info, + save, + &numbytes); + toku_ft_bn_apply_msg( + t->ft->cmp, + t->ft->update_fun, + blb1, + msg, + &non_mvcc_gc_info, + NULL, + NULL, + NULL); if (msn.msn > blb1->max_msn_applied.msn) { blb1->max_msn_applied = msn; } - toku_ft_bn_apply_msg(t->ft->cmp, t->ft->update_fun, blb2, msg, &non_mvcc_gc_info, NULL, NULL); + toku_ft_bn_apply_msg( + t->ft->cmp, + t->ft->update_fun, + blb2, + msg, + &non_mvcc_gc_info, + NULL, + NULL, + NULL); if (msn.msn > blb2->max_msn_applied.msn) { blb2->max_msn_applied = msn; } @@ -619,7 +659,16 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { if (make_leaf_up_to_date) { for (i = 0; i < num_parent_messages; ++i) { if (!parent_messages_is_fresh[i]) { - toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL); + toku_ft_leaf_apply_msg( + t->ft->cmp, + t->ft->update_fun, + child, + -1, + *parent_messages[i], + &non_mvcc_gc_info, + NULL, + NULL, + NULL); } } for (i = 0; i < 8; ++i) { @@ -842,7 +891,16 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) { for (i = 0; i < num_parent_messages; ++i) { if (dummy_cmp(parent_messages[i]->kdbt(), &childkeys[7]) <= 0 && !parent_messages_is_fresh[i]) { - toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL); + toku_ft_leaf_apply_msg( + t->ft->cmp, + t->ft->update_fun, + child, + -1, + *parent_messages[i], + &non_mvcc_gc_info, + NULL, + NULL, + NULL); } } for (i = 0; i < 8; ++i) { @@ -1045,8 +1103,26 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { if (make_leaf_up_to_date) { for (i = 0; i < num_parent_messages; ++i) { if (!parent_messages_is_fresh[i]) { - toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child1, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL); - toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child2, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL); + toku_ft_leaf_apply_msg( + t->ft->cmp, + t->ft->update_fun, + child1, + -1, + *parent_messages[i], + &non_mvcc_gc_info, + NULL, + NULL, + NULL); + toku_ft_leaf_apply_msg( + t->ft->cmp, + t->ft->update_fun, + child2, + -1, + *parent_messages[i], + &non_mvcc_gc_info, + NULL, + NULL, + NULL); } } for (i = 0; i < 8; ++i) { diff --git a/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc b/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc index 8e006498d77..7691ffaac2b 100644 --- a/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc +++ b/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc @@ -81,7 +81,7 @@ static void run_recovery(const char *testdir) { bool upgrade_in_progress; r = toku_maybe_upgrade_log(testdir, testdir, &lsn_of_clean_shutdown, &upgrade_in_progress); if (strcmp(shutdown, "dirty") == 0 && log_version <= 24) { - CKERR2(r, TOKUDB_UPGRADE_FAILURE); // we dont support dirty upgrade from versions <= 24 + CKERR2(r, TOKUDB_UPGRADE_FAILURE); // we don't support dirty upgrade from versions <= 24 return; } else { CKERR(r); diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24 Binary files differnew file mode 100755 index 00000000000..9a56e83e627 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24 Binary files differnew file mode 100755 index 00000000000..c552cda6673 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25 Binary files differnew file mode 100755 index 00000000000..26b8bcfbdcc --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25 Binary files differnew file mode 100755 index 00000000000..04d3190c818 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26 Binary files differnew file mode 100755 index 00000000000..02047325aa6 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26 Binary files differnew file mode 100755 index 00000000000..ce826b5608b --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27 Binary files differnew file mode 100755 index 00000000000..9849b977d73 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27 Binary files differnew file mode 100755 index 00000000000..8b658ea4c0a --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28 Binary files differnew file mode 100644 index 00000000000..11fecfb94b2 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28 Binary files differnew file mode 100644 index 00000000000..b7a9b03b583 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29 Binary files differnew file mode 100644 index 00000000000..a1f306f4a96 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29 diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29 Binary files differnew file mode 100644 index 00000000000..b9e79eeb1c4 --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29 diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc b/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc index 68fac0e6a9c..b10885c2e62 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc @@ -78,12 +78,21 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) // apply an insert to the leaf node ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); // Create bad tree (don't do following): // leafnode->max_msn_applied_to_node = msn; - // dont forget to dirty the node + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc b/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc index 49b2b8a6c21..c1d08ce41a6 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc @@ -65,9 +65,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) MSN msn = next_dummymsn(); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); - - // dont forget to dirty the node + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); + + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc b/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc index 72c4063f51f..22a29c0ff69 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc @@ -66,9 +66,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) MSN msn = next_dummymsn(); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); - - // dont forget to dirty the node + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); + + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc b/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc index f569f502dc8..80189dd9804 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc @@ -65,9 +65,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) MSN msn = next_dummymsn(); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); - - // dont forget to dirty the node + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); + + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc b/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc index 3a6db8ee4de..a84aac1f063 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc @@ -66,9 +66,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) MSN msn = next_dummymsn(); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); - - // dont forget to dirty the node + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); + + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc index 4392887718f..ca413f52567 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc @@ -68,9 +68,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) MSN msn = next_dummymsn(); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); - - // dont forget to dirty the node + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); + + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc index e3167bd3dc1..6efa06913c2 100644 --- a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc +++ b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc @@ -65,9 +65,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen) MSN msn = next_dummymsn(); ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids()); txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false); - toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL); - - // dont forget to dirty the node + toku_ft_bn_apply_msg_once( + BLB(leafnode, 0), + msg, + idx, + keylen, + NULL, + &gc_info, + NULL, + NULL, + NULL); + + // don't forget to dirty the node leafnode->dirty = 1; } diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc index 6a8c0d45b45..df830afd0df 100644 --- a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc +++ b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc @@ -186,6 +186,7 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { // Append the list to the front of the parent. if (child_log->oldest_logentry) { // There are some entries, so link them in. + parent_log->dirty = true; child_log->oldest_logentry->prev = parent_log->newest_logentry; if (!parent_log->oldest_logentry) { parent_log->oldest_logentry = child_log->oldest_logentry; diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.cc b/storage/tokudb/PerconaFT/ft/txn/txn.cc index cd0585dbf6c..dd03073a3ec 100644 --- a/storage/tokudb/PerconaFT/ft/txn/txn.cc +++ b/storage/tokudb/PerconaFT/ft/txn/txn.cc @@ -248,11 +248,24 @@ static txn_child_manager tcm; .xa_xid = {0, 0, 0, ""}, .progress_poll_fun = NULL, .progress_poll_fun_extra = NULL, - .txn_lock = ZERO_MUTEX_INITIALIZER, + + // You cannot initialize txn_lock a TOKU_MUTEX_INITIALIZER, because we + // will initialize it in the code below, and it cannot already + // be initialized at that point. Also, in general, you don't + // get to use PTHREAD_MUTEX_INITALIZER (which is what is inside + // TOKU_MUTEX_INITIALIZER) except in static variables, and this + // is initializing an auto variable. + // + // And we cannot simply avoid initializing these fields + // because, although it avoids -Wmissing-field-initializer + // errors under gcc, it gets other errors about non-trivial + // designated initializers not being supported. + + .txn_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER .open_fts = open_fts, .roll_info = roll_info, - .state_lock = ZERO_MUTEX_INITIALIZER, - .state_cond = TOKU_COND_INITIALIZER, + .state_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER + .state_cond = ZERO_COND_INITIALIZER, // Not TOKU_COND_INITIALIZER .state = TOKUTXN_LIVE, .num_pin = 0, .client_id = 0, diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc index 551fd32b8d5..88eca36a261 100644 --- a/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc +++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc @@ -45,7 +45,15 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "ft/txn/txn_manager.h" #include "ft/txn/rollback.h" #include "util/omt.h" +//this is only for testing +static void (* test_txn_sync_callback) (pthread_t, void *) = NULL; +static void * test_txn_sync_callback_extra = NULL; + +void set_test_txn_sync_callback(void (*cb) (pthread_t, void *), void *extra) { + test_txn_sync_callback = cb; + test_txn_sync_callback_extra = extra; +} bool garbage_collection_debug = false; static bool txn_records_snapshot(TXN_SNAPSHOT_TYPE snapshot_type, struct tokutxn *parent) { @@ -525,14 +533,19 @@ void toku_txn_manager_handle_snapshot_create_for_child_txn( XMALLOC(txn->live_root_txn_list); txn_manager_lock(txn_manager); txn_manager_create_snapshot_unlocked(txn_manager, txn); - txn_manager_unlock(txn_manager); } else { inherit_snapshot_from_parent(txn); } - if (copies_snapshot) { - setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list); - } + + toku_debug_txn_sync(pthread_self()); + + if (copies_snapshot) { + if(!records_snapshot) + txn_manager_lock(txn_manager); + setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list); + txn_manager_unlock(txn_manager); + } } void toku_txn_manager_handle_snapshot_destroy_for_child_txn( diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.h b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h index 658c6f9aecd..7cdc52c4f43 100644 --- a/storage/tokudb/PerconaFT/ft/txn/txn_manager.h +++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h @@ -43,6 +43,15 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #include "ft/txn/txn.h" +void set_test_txn_sync_callback(void (*) (pthread_t, void*), void*); +#define toku_test_txn_sync_callback(a) ((test_txn_sync_callback)? test_txn_sync_callback( a,test_txn_sync_callback_extra) : (void) 0) + +#if TOKU_DEBUG_TXN_SYNC +#define toku_debug_txn_sync(a) toku_test_txn_sync_callback(a) +#else +#define toku_debug_txn_sync(a) ((void) 0) +#endif + typedef struct txn_manager *TXN_MANAGER; struct referenced_xid_tuple { diff --git a/storage/tokudb/PerconaFT/ft/ule.cc b/storage/tokudb/PerconaFT/ft/ule.cc index 573c4488f70..ac393fbf179 100644 --- a/storage/tokudb/PerconaFT/ft/ule.cc +++ b/storage/tokudb/PerconaFT/ft/ule.cc @@ -73,12 +73,11 @@ void toku_le_get_status(LE_STATUS statp) { *statp = le_status; } -/////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// // Accessor functions used by outside world (e.g. indexer) // -ULEHANDLE -toku_ule_create(LEAFENTRY le) { +ULEHANDLE toku_ule_create(LEAFENTRY le) { ULE XMALLOC(ule_p); le_unpack(ule_p, le); return (ULEHANDLE) ule_p; @@ -89,7 +88,7 @@ void toku_ule_free(ULEHANDLE ule_p) { toku_free(ule_p); } -/////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// // // Question: Can any software outside this file modify or read a leafentry? // If so, is it worthwhile to put it all here? @@ -117,27 +116,43 @@ const UXR_S committed_delete = { // Local functions: -static void msg_init_empty_ule(ULE ule); -static void msg_modify_ule(ULE ule, const ft_msg &msg); -static void ule_init_empty_ule(ULE ule); +static inline void msg_init_empty_ule(ULE ule); +static int64_t msg_modify_ule(ULE ule, const ft_msg &msg); +static inline void ule_init_empty_ule(ULE ule); static void ule_do_implicit_promotions(ULE ule, XIDS xids); -static void ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid); +static void ule_try_promote_provisional_outermost( + ULE ule, + TXNID oldest_possible_live_xid); static void ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index); static void ule_promote_provisional_innermost_to_committed(ULE ule); -static void ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp); -static void ule_apply_delete(ULE ule, XIDS xids); -static void ule_prepare_for_new_uxr(ULE ule, XIDS xids); -static void ule_apply_abort(ULE ule, XIDS xids); +static inline int64_t ule_apply_insert_no_overwrite( + ULE ule, + XIDS xids, + uint32_t vallen, + void* valp); +static inline int64_t ule_apply_insert( + ULE ule, + XIDS xids, + uint32_t vallen, + void* valp); +static inline int64_t ule_apply_delete(ULE ule, XIDS xids); +static inline void ule_prepare_for_new_uxr(ULE ule, XIDS xids); +static inline int64_t ule_apply_abort(ULE ule, XIDS xids); static void ule_apply_broadcast_commit_all (ULE ule); static void ule_apply_commit(ULE ule, XIDS xids); -static void ule_push_insert_uxr(ULE ule, bool is_committed, TXNID xid, uint32_t vallen, void * valp); -static void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid); -static void ule_push_placeholder_uxr(ULE ule, TXNID xid); -static UXR ule_get_innermost_uxr(ULE ule); -static UXR ule_get_first_empty_uxr(ULE ule); -static void ule_remove_innermost_uxr(ULE ule); -static TXNID ule_get_innermost_xid(ULE ule); -static TXNID ule_get_xid(ULE ule, uint32_t index); +static inline void ule_push_insert_uxr( + ULE ule, + bool is_committed, + TXNID xid, + uint32_t vallen, + void* valp); +static inline void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid); +static inline void ule_push_placeholder_uxr(ULE ule, TXNID xid); +static inline UXR ule_get_innermost_uxr(ULE ule); +static inline UXR ule_get_first_empty_uxr(ULE ule); +static inline void ule_remove_innermost_uxr(ULE ule); +static inline TXNID ule_get_innermost_xid(ULE ule); +static inline TXNID ule_get_xid(ULE ule, uint32_t index); static void ule_remove_innermost_placeholders(ULE ule); static void ule_add_placeholders(ULE ule, XIDS xids); static void ule_optimize(ULE ule, XIDS xids); @@ -153,6 +168,30 @@ static inline size_t uxr_unpack_type_and_length(UXR uxr, uint8_t *p); static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p); static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p); +#if 0 +static void ule_print(ULE ule, const char* note) { + fprintf(stderr, "%s : ULE[0x%p]\n", note, ule); + fprintf(stderr, " num_puxrs[%u]\n", ule->num_puxrs); + fprintf(stderr, " num_cuxrs[%u]\n", ule->num_cuxrs); + fprintf(stderr, " innermost[%u]\n", ule->num_cuxrs + ule->num_puxrs - 1); + fprintf(stderr, " first_empty[%u]\n", ule->num_cuxrs + ule->num_puxrs); + + uint32_t num_uxrs = ule->num_cuxrs + ule->num_puxrs - 1; + for (uint32_t uxr_num = 0; uxr_num <= num_uxrs; uxr_num++) { + UXR uxr = &(ule->uxrs[uxr_num]); + fprintf(stderr, " uxr[%u]\n", uxr_num); + switch (uxr->type) { + case 0: fprintf(stderr, " type[NONE]\n"); break; + case 1: fprintf(stderr, " type[INSERT]\n"); break; + case 2: fprintf(stderr, " type[DELETE]\n"); break; + case 3: fprintf(stderr, " type[PLACEHOLDER]\n"); break; + default: fprintf(stderr, " type[WHAT??]\n"); break; + } + fprintf(stderr, " xid[%lu]\n", uxr->xid); + } +} +#endif + static void get_space_for_le( bn_data* data_buffer, uint32_t idx, @@ -162,21 +201,30 @@ static void get_space_for_le( uint32_t old_le_size, size_t size, LEAFENTRY* new_le_space, - void **const maybe_free - ) -{ + void** const maybe_free) { + if (data_buffer == nullptr) { CAST_FROM_VOIDP(*new_le_space, toku_xmalloc(size)); - } - else { + } else if (old_le_size > 0) { // this means we are overwriting something - if (old_le_size > 0) { - data_buffer->get_space_for_overwrite(idx, keyp, keylen, old_keylen, old_le_size, size, new_le_space, maybe_free); - } + data_buffer->get_space_for_overwrite( + idx, + keyp, + keylen, + old_keylen, + old_le_size, + size, + new_le_space, + maybe_free); + } else { // this means we are inserting something new - else { - data_buffer->get_space_for_insert(idx, keyp, keylen, size, new_le_space, maybe_free); - } + data_buffer->get_space_for_insert( + idx, + keyp, + keylen, + size, + new_le_space, + maybe_free); } } @@ -185,15 +233,13 @@ static void get_space_for_le( // Garbage collection related functions // -static TXNID -get_next_older_txnid(TXNID xc, const xid_omt_t &omt) { +static TXNID get_next_older_txnid(TXNID xc, const xid_omt_t &omt) { int r; TXNID xid; r = omt.find<TXNID, toku_find_xid_by_xid>(xc, -1, &xid, nullptr); if (r==0) { invariant(xid < xc); //sanity check - } - else { + } else { invariant(r==DB_NOTFOUND); xid = TXNID_NONE; } @@ -201,17 +247,32 @@ get_next_older_txnid(TXNID xc, const xid_omt_t &omt) { } // -// This function returns true if live transaction TL1 is allowed to read a value committed by -// transaction xc, false otherwise. +// This function returns true if live transaction TL1 is allowed to read a +// value committed by transaction xc, false otherwise. // -static bool -xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids) { +static bool xid_reads_committed_xid( + TXNID tl1, + TXNID xc, + const xid_omt_t& snapshot_txnids, + const rx_omt_t& referenced_xids) { + bool rval; - if (tl1 < xc) rval = false; //cannot read a newer txn - else { - TXNID x = toku_get_youngest_live_list_txnid_for(xc, snapshot_txnids, referenced_xids); - if (x == TXNID_NONE) rval = true; //Not in ANY live list, tl1 can read it. - else rval = tl1 > x; //Newer than the 'newest one that has it in live list' + if (tl1 < xc) { + rval = false; //cannot read a newer txn + } else { + TXNID x = + toku_get_youngest_live_list_txnid_for( + xc, + snapshot_txnids, + referenced_xids); + + if (x == TXNID_NONE) { + //Not in ANY live list, tl1 can read it. + rval = true; + } else { + //Newer than the 'newest one that has it in live list' + rval = tl1 > x; + } // we know tl1 > xc // we know x > xc // if tl1 == x, then we do not read, because tl1 is in xc's live list @@ -228,8 +289,7 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c // than oldest_referenced_xid. All elements below this entry are garbage, // so we get rid of them. // -static void -ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) { +static void ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) { if (ule->num_cuxrs == 1) { return; } @@ -240,7 +300,8 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) { // uxr with a txnid that is less than oldest_referenced_xid for (uint32_t i = 0; i < ule->num_cuxrs; i++) { curr_index = ule->num_cuxrs - i - 1; - if (ule->uxrs[curr_index].xid < gc_info->oldest_referenced_xid_for_simple_gc) { + if (ule->uxrs[curr_index].xid < + gc_info->oldest_referenced_xid_for_simple_gc) { break; } } @@ -250,12 +311,15 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) { curr_index = ule->num_cuxrs - 1; } - // curr_index is now set to the youngest uxr older than oldest_referenced_xid - // so if it's not the bottom of the stack.. + // curr_index is now set to the youngest uxr older than + // oldest_referenced_xid so if it's not the bottom of the stack.. if (curr_index != 0) { // ..then we need to get rid of the entries below curr_index uint32_t num_entries = ule->num_cuxrs + ule->num_puxrs - curr_index; - memmove(&ule->uxrs[0], &ule->uxrs[curr_index], num_entries * sizeof(ule->uxrs[0])); + memmove( + &ule->uxrs[0], + &ule->uxrs[curr_index], + num_entries * sizeof(ule->uxrs[0])); ule->uxrs[0].xid = TXNID_NONE; // New 'bottom of stack' loses its TXNID ule->num_cuxrs -= curr_index; } @@ -264,8 +328,12 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) { // TODO: Clean this up extern bool garbage_collection_debug; -static void -ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) { +static void ule_garbage_collect( + ULE ule, + const xid_omt_t& snapshot_xids, + const rx_omt_t& referenced_xids, + const xid_omt_t& live_root_txns) { + if (ule->num_cuxrs == 1) { return; } @@ -289,10 +357,12 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref // If we find that the committed transaction is in the live list, // then xc is really in the process of being committed. It has not // been fully committed. As a result, our assumption that transactions - // newer than what is currently in these OMTs will read the top of the stack - // is not necessarily accurate. Transactions may read what is just below xc. - // As a result, we must mark what is just below xc as necessary and move on. - // This issue was found while testing flusher threads, and was fixed for #3979 + // newer than what is currently in these OMTs will read the top of the + // stack is not necessarily accurate. Transactions may read what is + // just below xc. + // As a result, we must mark what is just below xc as necessary and + // move on. This issue was found while testing flusher threads, and was + // fixed for #3979 // bool is_xc_live = toku_is_txn_in_live_root_txn_list(live_root_txns, xc); if (is_xc_live) { @@ -300,13 +370,19 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref continue; } - tl1 = toku_get_youngest_live_list_txnid_for(xc, snapshot_xids, referenced_xids); + tl1 = + toku_get_youngest_live_list_txnid_for( + xc, + snapshot_xids, + referenced_xids); - // if tl1 == xc, that means xc should be live and show up in live_root_txns, which we check above. + // if tl1 == xc, that means xc should be live and show up in + // live_root_txns, which we check above. invariant(tl1 != xc); if (tl1 == TXNID_NONE) { - // set tl1 to youngest live transaction older than ule->uxrs[curr_committed_entry]->xid + // set tl1 to youngest live transaction older than + // ule->uxrs[curr_committed_entry]->xid tl1 = get_next_older_txnid(xc, snapshot_xids); if (tl1 == TXNID_NONE) { // remainder is garbage, we're done @@ -314,8 +390,13 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref } } if (garbage_collection_debug) { - int r = snapshot_xids.find_zero<TXNID, toku_find_xid_by_xid>(tl1, nullptr, nullptr); - invariant_zero(r); // make sure that the txn you are claiming is live is actually live + int r = + snapshot_xids.find_zero<TXNID, toku_find_xid_by_xid>( + tl1, + nullptr, + nullptr); + // make sure that the txn you are claiming is live is actually live + invariant_zero(r); } // // tl1 should now be set @@ -323,7 +404,11 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref curr_committed_entry--; while (curr_committed_entry > 0) { xc = ule->uxrs[curr_committed_entry].xid; - if (xid_reads_committed_xid(tl1, xc, snapshot_xids, referenced_xids)) { + if (xid_reads_committed_xid( + tl1, + xc, + snapshot_xids, + referenced_xids)) { break; } curr_committed_entry--; @@ -343,7 +428,10 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref ule->uxrs[0].xid = TXNID_NONE; //New 'bottom of stack' loses its TXNID if (first_free != ule->num_cuxrs) { // Shift provisional values - memmove(&ule->uxrs[first_free], &ule->uxrs[ule->num_cuxrs], ule->num_puxrs * sizeof(ule->uxrs[0])); + memmove( + &ule->uxrs[first_free], + &ule->uxrs[ule->num_cuxrs], + ule->num_puxrs * sizeof(ule->uxrs[0])); } ule->num_cuxrs = saved; } @@ -367,29 +455,42 @@ enum { ULE_MIN_MEMSIZE_TO_FORCE_GC = 1024 * 1024 }; -///////////////////////////////////////////////////////////////////////////////// -// This is the big enchilada. (Bring Tums.) Note that this level of abstraction -// has no knowledge of the inner structure of either leafentry or msg. It makes -// calls into the next lower layer (msg_xxx) which handles messages. +//////////////////////////////////////////////////////////////////////////////// +// This is the big enchilada. (Bring Tums.) Note that this level of +// abstraction has no knowledge of the inner structure of either leafentry or +// msg. It makes calls into the next lower layer (msg_xxx) which handles +// messages. // // NOTE: This is the only function (at least in this body of code) that modifies // a leafentry. // NOTE: It is the responsibility of the caller to make sure that the key is set // in the FT_MSG, as it will be used to store the data in the data_buffer // -// Return 0 on success. -// If the leafentry is destroyed it sets *new_leafentry_p to NULL. -// Otehrwise the new_leafentry_p points at the new leaf entry. -// As of October 2011, this function always returns 0. -void -toku_le_apply_msg(const ft_msg &msg, - LEAFENTRY old_leafentry, // NULL if there was no stored data. - bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data - uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced - uint32_t old_keylen, // length of the any key in data_buffer - txn_gc_info *gc_info, - LEAFENTRY *new_leafentry_p, - int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead +// Returns -1, 0, or 1 that identifies the change in logical row count needed +// based on the results of the message application. For example, if a delete +// finds no logical leafentry or if an insert finds a duplicate and is +// converted to an update. +// +// old_leafentry - NULL if there was no stored data. +// data_buffer - bn_data storing leafentry, if NULL, means there is no bn_data +// idx - index in data_buffer where leafentry is stored +// (and should be replaced) +// old_keylen - length of the any key in data_buffer +// new_leafentry_p - If the leafentry is destroyed it sets *new_leafentry_p +// to NULL. Otherwise the new_leafentry_p points at the new +// leaf entry. +// numbytes_delta_p - change in total size of key and val, not including any +// overhead +int64_t toku_le_apply_msg( + const ft_msg& msg, + LEAFENTRY old_leafentry, + bn_data* data_buffer, + uint32_t idx, + uint32_t old_keylen, + txn_gc_info* gc_info, + LEAFENTRY* new_leafentry_p, + int64_t* numbytes_delta_p) { + invariant_notnull(gc_info); paranoid_invariant_notnull(new_leafentry_p); ULE_S ule; @@ -397,6 +498,7 @@ toku_le_apply_msg(const ft_msg &msg, int64_t newnumbytes = 0; uint64_t oldmemsize = 0; uint32_t keylen = msg.kdbt()->size; + int32_t rowcountdelta = 0; if (old_leafentry == NULL) { msg_init_empty_ule(&ule); @@ -405,49 +507,62 @@ toku_le_apply_msg(const ft_msg &msg, le_unpack(&ule, old_leafentry); // otherwise unpack leafentry oldnumbytes = ule_get_innermost_numbytes(&ule, keylen); } - msg_modify_ule(&ule, msg); // modify unpacked leafentry - // - we may be able to immediately promote the newly-apllied outermost provisonal uxr - // - either way, run simple gc first, and then full gc if there are still some committed uxrs. - ule_try_promote_provisional_outermost(&ule, gc_info->oldest_referenced_xid_for_implicit_promotion); + // modify unpacked leafentry + rowcountdelta = msg_modify_ule(&ule, msg); + + // - we may be able to immediately promote the newly-apllied outermost + // provisonal uxr + // - either way, run simple gc first, and then full gc if there are still + // some committed uxrs. + ule_try_promote_provisional_outermost( + &ule, + gc_info->oldest_referenced_xid_for_implicit_promotion); ule_simple_garbage_collection(&ule, gc_info); txn_manager_state *txn_state_for_gc = gc_info->txn_state_for_gc; size_t size_before_gc = 0; - if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr && // there is garbage to clean, and our caller gave us state.. - // ..and either the state is pre-initialized, or the committed stack is large enough - (txn_state_for_gc->initialized || ule.num_cuxrs >= ULE_MIN_STACK_SIZE_TO_FORCE_GC || - // ..or the ule's raw memsize is sufficiently large - (size_before_gc = ule_packed_memsize(&ule)) >= ULE_MIN_MEMSIZE_TO_FORCE_GC)) { - // ..then it's worth running gc, possibly initializing the txn manager state, if it isn't already + // there is garbage to clean, and our caller gave us state.. + // ..and either the state is pre-initialized, or the committed stack is + // large enough + // ..or the ule's raw memsize is sufficiently large + // ..then it's worth running gc, possibly initializing the txn manager + // state, if it isn't already + if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr && + (txn_state_for_gc->initialized || + ule.num_cuxrs >= ULE_MIN_STACK_SIZE_TO_FORCE_GC || + (size_before_gc = ule_packed_memsize(&ule)) >= + ULE_MIN_MEMSIZE_TO_FORCE_GC)) { if (!txn_state_for_gc->initialized) { txn_state_for_gc->init(); } - - size_before_gc = size_before_gc != 0 ? size_before_gc : // it's already been calculated above - ule_packed_memsize(&ule); - ule_garbage_collect(&ule, - txn_state_for_gc->snapshot_xids, - txn_state_for_gc->referenced_xids, - txn_state_for_gc->live_root_txns - ); + // it's already been calculated above + size_before_gc = + size_before_gc != 0 ? size_before_gc : ule_packed_memsize(&ule); + ule_garbage_collect( + &ule, + txn_state_for_gc->snapshot_xids, + txn_state_for_gc->referenced_xids, + txn_state_for_gc->live_root_txns); size_t size_after_gc = ule_packed_memsize(&ule); LE_STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc); LE_STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc); } - void *maybe_free = nullptr; - int r = le_pack( - &ule, // create packed leafentry - data_buffer, - idx, - msg.kdbt()->data, // contract of this function is caller has this set, always - keylen, // contract of this function is caller has this set, always - old_keylen, - oldmemsize, - new_leafentry_p, - &maybe_free - ); + void* maybe_free = nullptr; + // create packed leafentry + // contract of this function is caller has keyp and keylen set, always + int r = + le_pack( + &ule, + data_buffer, + idx, + msg.kdbt()->data, + keylen, + old_keylen, + oldmemsize, + new_leafentry_p, + &maybe_free); invariant_zero(r); if (*new_leafentry_p) { newnumbytes = ule_get_innermost_numbytes(&ule, keylen); @@ -458,16 +573,22 @@ toku_le_apply_msg(const ft_msg &msg, if (maybe_free != nullptr) { toku_free(maybe_free); } + return rowcountdelta; } -bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info) { -// Effect: Quickly determines if it's worth trying to run garbage collection on a leafentry +bool toku_le_worth_running_garbage_collection( + LEAFENTRY le, + txn_gc_info* gc_info) { +// Effect: Quickly determines if it's worth trying to run garbage collection +// on a leafentry // Return: True if it makes sense to try garbage collection, false otherwise. // Rationale: Garbage collection is likely to clean up under two circumstances: -// 1.) There are multiple committed entries. Some may never be read by new txns. -// 2.) There is only one committed entry, but the outermost provisional entry -// is older than the oldest known referenced xid, so it must have commited. -// Therefor we can promote it to committed and get rid of the old commited entry. +// 1.) There are multiple committed entries. Some may never be read +// by new txns. +// 2.) There is only one committed entry, but the outermost +// provisional entry is older than the oldest known referenced +// xid, so it must have commited. Therefor we can promote it to +// committed and get rid of the old commited entry. if (le->type != LE_MVCC) { return false; } @@ -477,7 +598,8 @@ bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info paranoid_invariant(le->u.mvcc.num_cxrs == 1); } return le->u.mvcc.num_pxrs > 0 && - le_outermost_uncommitted_xid(le) < gc_info->oldest_referenced_xid_for_implicit_promotion; + le_outermost_uncommitted_xid(le) < + gc_info->oldest_referenced_xid_for_implicit_promotion; } // Garbage collect one leaf entry, using the given OMT's. @@ -498,16 +620,18 @@ bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info // -- referenced_xids : list of in memory active transactions. // NOTE: it is not a good idea to garbage collect a leaf // entry with only one committed value. -void -toku_le_garbage_collect(LEAFENTRY old_leaf_entry, - bn_data* data_buffer, - uint32_t idx, - void* keyp, - uint32_t keylen, - txn_gc_info *gc_info, - LEAFENTRY *new_leaf_entry, - int64_t * numbytes_delta_p) { - // We shouldn't want to run gc without having provided a snapshot of the txn system. +void toku_le_garbage_collect( + LEAFENTRY old_leaf_entry, + bn_data* data_buffer, + uint32_t idx, + void* keyp, + uint32_t keylen, + txn_gc_info* gc_info, + LEAFENTRY* new_leaf_entry, + int64_t* numbytes_delta_p) { + + // We shouldn't want to run gc without having provided a snapshot of the + // txn system. invariant_notnull(gc_info); invariant_notnull(gc_info->txn_state_for_gc); paranoid_invariant_notnull(new_leaf_entry); @@ -520,20 +644,24 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry, oldnumbytes = ule_get_innermost_numbytes(&ule, keylen); uint32_t old_mem_size = leafentry_memsize(old_leaf_entry); - // Before running garbage collection, try to promote the outermost provisional - // entries to committed if its xid is older than the oldest possible live xid. + // Before running garbage collection, try to promote the outermost + // provisional entries to committed if its xid is older than the oldest + // possible live xid. // // The oldest known refeferenced xid is a lower bound on the oldest possible // live xid, so we use that. It's usually close enough to get rid of most // garbage in leafentries. - ule_try_promote_provisional_outermost(&ule, gc_info->oldest_referenced_xid_for_implicit_promotion); + ule_try_promote_provisional_outermost( + &ule, + gc_info->oldest_referenced_xid_for_implicit_promotion); // No need to run simple gc here if we're going straight for full gc. if (ule.num_cuxrs > 1) { size_t size_before_gc = ule_packed_memsize(&ule); - ule_garbage_collect(&ule, - gc_info->txn_state_for_gc->snapshot_xids, - gc_info->txn_state_for_gc->referenced_xids, - gc_info->txn_state_for_gc->live_root_txns); + ule_garbage_collect( + &ule, + gc_info->txn_state_for_gc->snapshot_xids, + gc_info->txn_state_for_gc->referenced_xids, + gc_info->txn_state_for_gc->live_root_txns); size_t size_after_gc = ule_packed_memsize(&ule); LE_STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc); @@ -541,17 +669,18 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry, } void *maybe_free = nullptr; - int r = le_pack( - &ule, - data_buffer, - idx, - keyp, - keylen, - keylen, // old_keylen, same because the key isn't going to change for gc - old_mem_size, - new_leaf_entry, - &maybe_free - ); + // old_keylen, same because the key isn't going to change for gc + int r = + le_pack( + &ule, + data_buffer, + idx, + keyp, + keylen, + keylen, + old_mem_size, + new_leaf_entry, + &maybe_free); invariant_zero(r); if (*new_leaf_entry) { newnumbytes = ule_get_innermost_numbytes(&ule, keylen); @@ -564,49 +693,54 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry, } } -///////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// // This layer of abstraction (msg_xxx) // knows the accessors of msg, but not of leafentry or unpacked leaf entry. // It makes calls into the lower layer (le_xxx) which handles leafentries. // Purpose is to init the ule with given key and no transaction records // -static void -msg_init_empty_ule(ULE ule) { +static inline void msg_init_empty_ule(ULE ule) { ule_init_empty_ule(ule); } // Purpose is to modify the unpacked leafentry in our private workspace. // -static void -msg_modify_ule(ULE ule, const ft_msg &msg) { +// Returns -1, 0, or 1 that identifies the change in logical row count needed +// based on the results of the message application. For example, if a delete +// finds no logical leafentry or if an insert finds a duplicate and is +// converted to an update. +static int64_t msg_modify_ule(ULE ule, const ft_msg &msg) { + int64_t retval = 0; XIDS xids = msg.xids(); invariant(toku_xids_get_num_xids(xids) < MAX_TRANSACTION_RECORDS); enum ft_msg_type type = msg.type(); - if (type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE) { + if (FT_LIKELY(type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE)) { ule_do_implicit_promotions(ule, xids); } switch (type) { - case FT_INSERT_NO_OVERWRITE: { - UXR old_innermost_uxr = ule_get_innermost_uxr(ule); - //If something exists, quit (no overwrite). - if (uxr_is_insert(old_innermost_uxr)) break; - //else it is just an insert, so - //fall through to FT_INSERT on purpose. - } - case FT_INSERT: { - uint32_t vallen = msg.vdbt()->size; - invariant(IS_VALID_LEN(vallen)); - void * valp = msg.vdbt()->data; - ule_apply_insert(ule, xids, vallen, valp); + case FT_INSERT_NO_OVERWRITE: + retval = + ule_apply_insert_no_overwrite( + ule, + xids, + msg.vdbt()->size, + msg.vdbt()->data); + break; + case FT_INSERT: + retval = + ule_apply_insert( + ule, + xids, + msg.vdbt()->size, + msg.vdbt()->data); break; - } case FT_DELETE_ANY: - ule_apply_delete(ule, xids); + retval = ule_apply_delete(ule, xids); break; case FT_ABORT_ANY: case FT_ABORT_BROADCAST_TXN: - ule_apply_abort(ule, xids); + retval = ule_apply_abort(ule, xids); break; case FT_COMMIT_BROADCAST_ALL: ule_apply_broadcast_commit_all(ule); @@ -621,34 +755,40 @@ msg_modify_ule(ULE ule, const ft_msg &msg) { break; case FT_UPDATE: case FT_UPDATE_BROADCAST_ALL: - assert(false); // These messages don't get this far. Instead they get translated (in setval_fun in do_update) into FT_INSERT messages. + // These messages don't get this far. Instead they get translated (in + // setval_fun in do_update) into FT_INSERT messages. + assert(false); break; default: - assert(false); /* illegal ft msg type */ + // illegal ft msg type + assert(false); break; } + return retval; } -void test_msg_modify_ule(ULE ule, const ft_msg &msg){ +void test_msg_modify_ule(ULE ule, const ft_msg &msg) { msg_modify_ule(ule,msg); } static void ule_optimize(ULE ule, XIDS xids) { if (ule->num_puxrs) { - TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid; // outermost uncommitted + // outermost uncommitted + TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid; TXNID oldest_living_xid = TXNID_NONE; uint32_t num_xids = toku_xids_get_num_xids(xids); if (num_xids > 0) { invariant(num_xids==1); oldest_living_xid = toku_xids_get_xid(xids, 0); } - if (oldest_living_xid == TXNID_NONE || uncommitted < oldest_living_xid) { + if (oldest_living_xid == TXNID_NONE || + uncommitted < oldest_living_xid) { ule_promote_provisional_innermost_to_committed(ule); } } } -///////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// // This layer of abstraction (le_xxx) understands the structure of the leafentry // and of the unpacked leafentry. It is the only layer that understands the // structure of leafentry. It has no knowledge of any other data structures. @@ -657,8 +797,7 @@ static void ule_optimize(ULE ule, XIDS xids) { // // required for every le_unpack that is done // -void -ule_cleanup(ULE ule) { +void ule_cleanup(ULE ule) { invariant(ule->uxrs); if (ule->uxrs != ule->uxrs_static) { toku_free(ule->uxrs); @@ -668,8 +807,7 @@ ule_cleanup(ULE ule) { // populate an unpacked leafentry using pointers into the given leafentry. // thus, the memory referenced by 'le' must live as long as the ULE. -void -le_unpack(ULE ule, LEAFENTRY le) { +void le_unpack(ULE ule, LEAFENTRY le) { uint8_t type = le->type; uint8_t *p; uint32_t i; @@ -694,9 +832,10 @@ le_unpack(ULE ule, LEAFENTRY le) { //Dynamic memory if (ule->num_cuxrs < MAX_TRANSACTION_RECORDS) { ule->uxrs = ule->uxrs_static; - } - else { - XMALLOC_N(ule->num_cuxrs + 1 + MAX_TRANSACTION_RECORDS, ule->uxrs); + } else { + XMALLOC_N( + ule->num_cuxrs + 1 + MAX_TRANSACTION_RECORDS, + ule->uxrs); } p = le->u.mvcc.xrs; @@ -717,9 +856,12 @@ le_unpack(ULE ule, LEAFENTRY le) { p += uxr_unpack_length_and_bit(innermost, p); } for (i = 0; i < ule->num_cuxrs; i++) { - p += uxr_unpack_length_and_bit(ule->uxrs + ule->num_cuxrs - 1 - i, p); + p += + uxr_unpack_length_and_bit( + ule->uxrs + ule->num_cuxrs - 1 - i, + p); } - + //unpack interesting values inner to outer if (ule->num_puxrs!=0) { UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1; @@ -761,14 +903,12 @@ le_unpack(ULE ule, LEAFENTRY le) { #endif } -static inline size_t -uxr_pack_txnid(UXR uxr, uint8_t *p) { +static inline size_t uxr_pack_txnid(UXR uxr, uint8_t *p) { *(TXNID*)p = toku_htod64(uxr->xid); return sizeof(TXNID); } -static inline size_t -uxr_pack_type_and_length(UXR uxr, uint8_t *p) { +static inline size_t uxr_pack_type_and_length(UXR uxr, uint8_t *p) { size_t rval = 1; *p = uxr->type; if (uxr_is_insert(uxr)) { @@ -778,21 +918,18 @@ uxr_pack_type_and_length(UXR uxr, uint8_t *p) { return rval; } -static inline size_t -uxr_pack_length_and_bit(UXR uxr, uint8_t *p) { +static inline size_t uxr_pack_length_and_bit(UXR uxr, uint8_t *p) { uint32_t length_and_bit; if (uxr_is_insert(uxr)) { length_and_bit = INSERT_LENGTH(uxr->vallen); - } - else { + } else { length_and_bit = DELETE_LENGTH(uxr->vallen); } *(uint32_t*)p = toku_htod32(length_and_bit); return sizeof(uint32_t); } -static inline size_t -uxr_pack_data(UXR uxr, uint8_t *p) { +static inline size_t uxr_pack_data(UXR uxr, uint8_t *p) { if (uxr_is_insert(uxr)) { memcpy(p, uxr->valp, uxr->vallen); return uxr->vallen; @@ -800,14 +937,12 @@ uxr_pack_data(UXR uxr, uint8_t *p) { return 0; } -static inline size_t -uxr_unpack_txnid(UXR uxr, uint8_t *p) { +static inline size_t uxr_unpack_txnid(UXR uxr, uint8_t *p) { uxr->xid = toku_dtoh64(*(TXNID*)p); return sizeof(TXNID); } -static inline size_t -uxr_unpack_type_and_length(UXR uxr, uint8_t *p) { +static inline size_t uxr_unpack_type_and_length(UXR uxr, uint8_t *p) { size_t rval = 1; uxr->type = *p; if (uxr_is_insert(uxr)) { @@ -817,22 +952,19 @@ uxr_unpack_type_and_length(UXR uxr, uint8_t *p) { return rval; } -static inline size_t -uxr_unpack_length_and_bit(UXR uxr, uint8_t *p) { +static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p) { uint32_t length_and_bit = toku_dtoh32(*(uint32_t*)p); if (IS_INSERT(length_and_bit)) { uxr->type = XR_INSERT; uxr->vallen = GET_LENGTH(length_and_bit); - } - else { + } else { uxr->type = XR_DELETE; uxr->vallen = 0; } return sizeof(uint32_t); } -static inline size_t -uxr_unpack_data(UXR uxr, uint8_t *p) { +static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p) { if (uxr_is_insert(uxr)) { uxr->valp = p; return uxr->vallen; @@ -841,8 +973,7 @@ uxr_unpack_data(UXR uxr, uint8_t *p) { } // executed too often to be worth making threadsafe -static inline void -update_le_status(ULE ule, size_t memsize) { +static inline void update_le_status(ULE ule, size_t memsize) { if (ule->num_cuxrs > LE_STATUS_VAL(LE_MAX_COMMITTED_XR)) LE_STATUS_VAL(LE_MAX_COMMITTED_XR) = ule->num_cuxrs; if (ule->num_puxrs > LE_STATUS_VAL(LE_MAX_PROVISIONAL_XR)) @@ -856,21 +987,22 @@ update_le_status(ULE ule, size_t memsize) { // Purpose is to return a newly allocated leaf entry in packed format, or // return null if leaf entry should be destroyed (if no transaction records // are for inserts). -// Transaction records in packed le are stored inner to outer (first xr is innermost), -// with some information extracted out of the transaction records into the header. +// Transaction records in packed le are stored inner to outer (first xr is +// innermost), with some information extracted out of the transaction records +// into the header. // Transaction records in ule are stored outer to inner (uxr[0] is outermost). -int -le_pack(ULE ule, // data to be packed into new leafentry - bn_data* data_buffer, - uint32_t idx, - void* keyp, - uint32_t keylen, - uint32_t old_keylen, - uint32_t old_le_size, - LEAFENTRY * const new_leafentry_p, // this is what this function creates - void **const maybe_free - ) -{ +// Takes 'ule' and creates 'new_leafentry_p +int le_pack( + ULE ule, + bn_data* data_buffer, + uint32_t idx, + void* keyp, + uint32_t keylen, + uint32_t old_keylen, + uint32_t old_le_size, + LEAFENTRY* const new_leafentry_p, + void** const maybe_free) { + invariant(ule->num_cuxrs > 0); invariant(ule->uxrs[0].xid == TXNID_NONE); int rval; @@ -888,7 +1020,8 @@ le_pack(ULE ule, // data to be packed into new leafentry } } if (data_buffer && old_le_size > 0) { - // must pass old_keylen and old_le_size, since that's what is actually stored in data_buffer + // must pass old_keylen and old_le_size, since that's what is + // actually stored in data_buffer data_buffer->delete_leafentry(idx, old_keylen, old_le_size); } *new_leafentry_p = NULL; @@ -898,14 +1031,24 @@ le_pack(ULE ule, // data to be packed into new leafentry found_insert: memsize = le_memsize_from_ule(ule); LEAFENTRY new_leafentry; - get_space_for_le(data_buffer, idx, keyp, keylen, old_keylen, old_le_size, memsize, &new_leafentry, maybe_free); + get_space_for_le( + data_buffer, + idx, + keyp, + keylen, + old_keylen, + old_le_size, + memsize, + &new_leafentry, + maybe_free); //p always points to first unused byte after leafentry we are packing uint8_t *p; invariant(ule->num_cuxrs>0); //Type specific data if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) { - //Pack a 'clean leafentry' (no uncommitted transactions, only one committed value) + //Pack a 'clean leafentry' (no uncommitted transactions, only one + //committed value) new_leafentry->type = LE_CLEAN; uint32_t vallen = ule->uxrs[0].vallen; @@ -917,8 +1060,7 @@ found_insert: //Set p to after leafentry p = new_leafentry->u.clean.val + vallen; - } - else { + } else { uint32_t i; //Pack an 'mvcc leafentry' new_leafentry->type = LE_MVCC; @@ -969,7 +1111,9 @@ found_insert: p += uxr_pack_data(outermost, p); } //pack txnid, length, bit, data for non-outermost, non-innermost - for (i = ule->num_cuxrs + 1; i < ule->num_cuxrs + ule->num_puxrs - 1; i++) { + for (i = ule->num_cuxrs + 1; + i < ule->num_cuxrs + ule->num_puxrs - 1; + i++) { UXR uxr = ule->uxrs + i; p += uxr_pack_txnid(uxr, p); p += uxr_pack_type_and_length(uxr, p); @@ -1022,13 +1166,13 @@ cleanup: return rval; } -////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// // Following functions provide convenient access to a packed leafentry. //Requires: -// Leafentry that ule represents should not be destroyed (is not just all deletes) -size_t -le_memsize_from_ule (ULE ule) { +// Leafentry that ule represents should not be destroyed (is not just all +// deletes) +size_t le_memsize_from_ule (ULE ule) { invariant(ule->num_cuxrs); size_t rval; if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) { @@ -1037,13 +1181,13 @@ le_memsize_from_ule (ULE ule) { rval = 1 //type +4 //vallen +committed->vallen; //actual val - } - else { + } else { rval = 1 //type +4 //num_cuxrs +1 //num_puxrs +4*(ule->num_cuxrs) //types+lengths for committed - +8*(ule->num_cuxrs + ule->num_puxrs - 1); //txnids (excluding superroot) + +8*(ule->num_cuxrs + ule->num_puxrs - 1); //txnids (excluding + //superroot) uint32_t i; //Count data from committed uxrs and innermost puxr for (i = 0; i < ule->num_cuxrs; i++) { @@ -1072,8 +1216,11 @@ le_memsize_from_ule (ULE ule) { } // TODO: rename -size_t -leafentry_rest_memsize(uint32_t num_puxrs, uint32_t num_cuxrs, uint8_t* start) { +size_t leafentry_rest_memsize( + uint32_t num_puxrs, + uint32_t num_cuxrs, + uint8_t* start) { + UXR_S uxr; size_t lengths = 0; uint8_t* p = start; @@ -1122,8 +1269,7 @@ leafentry_rest_memsize(uint32_t num_puxrs, uint32_t num_cuxrs, uint8_t* start) { return rval; } -size_t -leafentry_memsize (LEAFENTRY le) { +size_t leafentry_memsize (LEAFENTRY le) { size_t rval = 0; uint8_t type = le->type; @@ -1162,13 +1308,11 @@ leafentry_memsize (LEAFENTRY le) { return rval; } -size_t -leafentry_disksize (LEAFENTRY le) { +size_t leafentry_disksize (LEAFENTRY le) { return leafentry_memsize(le); } -bool -le_is_clean(LEAFENTRY le) { +bool le_is_clean(LEAFENTRY le) { uint8_t type = le->type; uint32_t rval; switch (type) { @@ -1228,13 +1372,14 @@ int le_latest_is_del(LEAFENTRY le) { // -// returns true if the outermost provisional transaction id on the leafentry's stack matches -// the outermost transaction id in xids -// It is used to determine if a broadcast commit/abort message (look in ft-ops.c) should be applied to this leafentry -// If the outermost transactions match, then the broadcast commit/abort should be applied +// returns true if the outermost provisional transaction id on the leafentry's +// stack matches the outermost transaction id in xids +// It is used to determine if a broadcast commit/abort message (look in ft-ops.c) +// should be applied to this leafentry +// If the outermost transactions match, then the broadcast commit/abort should +// be applied // -bool -le_has_xids(LEAFENTRY le, XIDS xids) { +bool le_has_xids(LEAFENTRY le, XIDS xids) { //Read num_uxrs uint32_t num_xids = toku_xids_get_num_xids(xids); invariant(num_xids > 0); //Disallow checking for having TXNID_NONE @@ -1245,8 +1390,7 @@ le_has_xids(LEAFENTRY le, XIDS xids) { return rval; } -void* -le_latest_val_and_len (LEAFENTRY le, uint32_t *len) { +void* le_latest_val_and_len (LEAFENTRY le, uint32_t *len) { uint8_t type = le->type; void *valp; @@ -1277,8 +1421,7 @@ le_latest_val_and_len (LEAFENTRY le, uint32_t *len) { if (uxr_is_insert(&uxr)) { *len = uxr.vallen; valp = p + (num_cuxrs - 1 + (num_puxrs!=0))*sizeof(uint32_t); - } - else { + } else { *len = 0; valp = NULL; } @@ -1295,8 +1438,7 @@ le_latest_val_and_len (LEAFENTRY le, uint32_t *len) { if (uxr_is_insert(uxr)) { slow_valp = uxr->valp; slow_len = uxr->vallen; - } - else { + } else { slow_valp = NULL; slow_len = 0; } @@ -1310,8 +1452,7 @@ le_latest_val_and_len (LEAFENTRY le, uint32_t *len) { } //DEBUG ONLY can be slow -void* -le_latest_val (LEAFENTRY le) { +void* le_latest_val (LEAFENTRY le) { ULE_S ule; le_unpack(&ule, le); UXR uxr = ule_get_innermost_uxr(&ule); @@ -1325,8 +1466,7 @@ le_latest_val (LEAFENTRY le) { } //needed to be fast for statistics. -uint32_t -le_latest_vallen (LEAFENTRY le) { +uint32_t le_latest_vallen (LEAFENTRY le) { uint32_t rval; uint8_t type = le->type; uint8_t *p; @@ -1354,8 +1494,7 @@ le_latest_vallen (LEAFENTRY le) { uxr_unpack_length_and_bit(&uxr, p); if (uxr_is_insert(&uxr)) { rval = uxr.vallen; - } - else { + } else { rval = 0; } break; @@ -1377,8 +1516,7 @@ le_latest_vallen (LEAFENTRY le) { return rval; } -uint64_t -le_outermost_uncommitted_xid (LEAFENTRY le) { +uint64_t le_outermost_uncommitted_xid (LEAFENTRY le) { uint64_t rval = TXNID_NONE; uint8_t type = le->type; @@ -1412,8 +1550,7 @@ le_outermost_uncommitted_xid (LEAFENTRY le) { //Optimization not required. This is a debug only function. //Print a leafentry out in human-readable format -int -print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) { +int print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) { ULE_S ule; le_unpack(&ule, le); uint32_t i; @@ -1444,23 +1581,21 @@ print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) { return 0; } -///////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// // This layer of abstraction (ule_xxx) knows the structure of the unpacked // leafentry and no other structure. // // ule constructor // Note that transaction 0 is explicit in the ule -static void -ule_init_empty_ule(ULE ule) { +static inline void ule_init_empty_ule(ULE ule) { ule->num_cuxrs = 1; ule->num_puxrs = 0; ule->uxrs = ule->uxrs_static; ule->uxrs[0] = committed_delete; } -static inline int32_t -min_i32(int32_t a, int32_t b) { +static inline int32_t min_i32(int32_t a, int32_t b) { int32_t rval = a < b ? a : b; return rval; } @@ -1470,8 +1605,8 @@ min_i32(int32_t a, int32_t b) { // // If the leafentry has already been promoted, there is nothing to do. // We have two transaction stacks (one from message, one from leaf entry). -// We want to implicitly promote transactions newer than (but not including) -// the innermost common ancestor (ICA) of the two stacks of transaction ids. We +// We want to implicitly promote transactions newer than (but not including) +// the innermost common ancestor (ICA) of the two stacks of transaction ids. We // know that this is the right thing to do because each transaction with an id // greater (later) than the ICA must have been either committed or aborted. // If it was aborted then we would have seen an abort message and removed the @@ -1483,8 +1618,7 @@ min_i32(int32_t a, int32_t b) { // record of ICA, keeping the transaction id of the ICA. // Outermost xid is zero for both ule and xids<> // -static void -ule_do_implicit_promotions(ULE ule, XIDS xids) { +static void ule_do_implicit_promotions(ULE ule, XIDS xids) { //Optimization for (most) common case. //No commits necessary if everything is already committed. if (ule->num_puxrs > 0) { @@ -1506,17 +1640,16 @@ ule_do_implicit_promotions(ULE ule, XIDS xids) { if (ica_index < ule->num_cuxrs) { invariant(ica_index == ule->num_cuxrs - 1); ule_promote_provisional_innermost_to_committed(ule); - } - else if (ica_index < ule->num_cuxrs + ule->num_puxrs - 1) { - //If ica is the innermost uxr in the leafentry, no commits are necessary. + } else if (ica_index < ule->num_cuxrs + ule->num_puxrs - 1) { + //If ica is the innermost uxr in the leafentry, no commits are + //necessary. ule_promote_provisional_innermost_to_index(ule, ica_index); } } } -static void -ule_promote_provisional_innermost_to_committed(ULE ule) { +static void ule_promote_provisional_innermost_to_committed(ULE ule) { //Must be something to promote. invariant(ule->num_puxrs); //Take value (or delete flag) from innermost. @@ -1532,8 +1665,7 @@ ule_promote_provisional_innermost_to_committed(ULE ule) { ule->num_puxrs = 0; //Discard all provisional uxrs. if (uxr_is_delete(old_innermost_uxr)) { ule_push_delete_uxr(ule, true, old_outermost_uncommitted_uxr->xid); - } - else { + } else { ule_push_insert_uxr(ule, true, old_outermost_uncommitted_uxr->xid, old_innermost_uxr->vallen, @@ -1541,11 +1673,13 @@ ule_promote_provisional_innermost_to_committed(ULE ule) { } } -static void -ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid) { +static void ule_try_promote_provisional_outermost( + ULE ule, + TXNID oldest_possible_live_xid) { // Effect: If there is a provisional record whose outermost xid is older than // the oldest known referenced_xid, promote it to committed. - if (ule->num_puxrs > 0 && ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) { + if (ule->num_puxrs > 0 && + ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) { ule_promote_provisional_innermost_to_committed(ule); } } @@ -1553,8 +1687,9 @@ ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid) { // Purpose is to promote the value (and type) of the innermost transaction // record to the uxr at the specified index (keeping the txnid of the uxr at // specified index.) -static void -ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index) { +static void ule_promote_provisional_innermost_to_index( + ULE ule, + uint32_t index) { //Must not promote to committed portion of stack. invariant(index >= ule->num_cuxrs); //Must actually be promoting. @@ -1562,15 +1697,17 @@ ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index) { UXR old_innermost_uxr = ule_get_innermost_uxr(ule); assert(!uxr_is_placeholder(old_innermost_uxr)); TXNID new_innermost_xid = ule->uxrs[index].xid; - ule->num_puxrs = index - ule->num_cuxrs; //Discard old uxr at index (and everything inner) + //Discard old uxr at index (and everything inner) + ule->num_puxrs = index - ule->num_cuxrs; if (uxr_is_delete(old_innermost_uxr)) { ule_push_delete_uxr(ule, false, new_innermost_xid); - } - else { - ule_push_insert_uxr(ule, false, - new_innermost_xid, - old_innermost_uxr->vallen, - old_innermost_uxr->valp); + } else { + ule_push_insert_uxr( + ule, + false, + new_innermost_xid, + old_innermost_uxr->vallen, + old_innermost_uxr->valp); } } @@ -1581,19 +1718,60 @@ ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index) { // Purpose is to apply an insert message to this leafentry: -static void -ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp) { +static inline int64_t ule_apply_insert_no_overwrite( + ULE ule, + XIDS xids, + uint32_t vallen, + void* valp) { + + invariant(IS_VALID_LEN(vallen)); + int64_t retval = 0; + UXR old_innermost_uxr = ule_get_innermost_uxr(ule); + // If something exists, don't overwrite + if (uxr_is_insert(old_innermost_uxr)) { + retval = -1; + return retval; + } ule_prepare_for_new_uxr(ule, xids); - TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction doing this insert + // xid of transaction doing this insert + TXNID this_xid = toku_xids_get_innermost_xid(xids); ule_push_insert_uxr(ule, this_xid == TXNID_NONE, this_xid, vallen, valp); + return retval; +} + +// Purpose is to apply an insert message to this leafentry: +static inline int64_t ule_apply_insert( + ULE ule, + XIDS xids, + uint32_t vallen, + void* valp) { + + invariant(IS_VALID_LEN(vallen)); + int64_t retval = 0; + UXR old_innermost_uxr = ule_get_innermost_uxr(ule); + // If something exists, overwrite + if (uxr_is_insert(old_innermost_uxr)) { + retval = -1; + } + ule_prepare_for_new_uxr(ule, xids); + // xid of transaction doing this insert + TXNID this_xid = toku_xids_get_innermost_xid(xids); + ule_push_insert_uxr(ule, this_xid == TXNID_NONE, this_xid, vallen, valp); + return retval; } // Purpose is to apply a delete message to this leafentry: -static void -ule_apply_delete(ULE ule, XIDS xids) { +static inline int64_t ule_apply_delete(ULE ule, XIDS xids) { + int64_t retval = 0; + UXR old_innermost_uxr = ule_get_innermost_uxr(ule); + if (FT_UNLIKELY(uxr_is_delete(old_innermost_uxr))) { + retval = 1; + } ule_prepare_for_new_uxr(ule, xids); - TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction doing this delete + // xid of transaction doing this delete + TXNID this_xid = toku_xids_get_innermost_xid(xids); ule_push_delete_uxr(ule, this_xid == TXNID_NONE, this_xid); + return retval; } // First, discard anything done earlier by this transaction. @@ -1601,20 +1779,18 @@ ule_apply_delete(ULE ule, XIDS xids) { // outer transactions that are newer than then newest (innermost) transaction in // the leafentry. If so, record those outer transactions in the leafentry // with placeholders. -static void -ule_prepare_for_new_uxr(ULE ule, XIDS xids) { +static inline void ule_prepare_for_new_uxr(ULE ule, XIDS xids) { TXNID this_xid = toku_xids_get_innermost_xid(xids); //This is for LOADER_USE_PUTS or transactionless environment //where messages use XIDS of 0 if (this_xid == TXNID_NONE && ule_get_innermost_xid(ule) == TXNID_NONE) { ule_remove_innermost_uxr(ule); - } - // case where we are transactional and xids stack matches ule stack - else if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) { + } else if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) { + // case where we are transactional and xids stack matches ule stack ule_remove_innermost_uxr(ule); - } - // case where we are transactional and xids stack does not match ule stack - else { + } else { + // case where we are transactional and xids stack does not match ule + // stack ule_add_placeholders(ule, xids); } } @@ -1625,10 +1801,12 @@ ule_prepare_for_new_uxr(ULE ule, XIDS xids) { // then there is nothing to be done. // If this transaction did modify the leafentry, then undo whatever it did (by // removing the transaction record (uxr) and any placeholders underneath. -// Remember, the innermost uxr can only be an insert or a delete, not a placeholder. -static void -ule_apply_abort(ULE ule, XIDS xids) { - TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction doing this abort +// Remember, the innermost uxr can only be an insert or a delete, not a +// placeholder. +static inline int64_t ule_apply_abort(ULE ule, XIDS xids) { + int64_t retval = 0; + // xid of transaction doing this abort + TXNID this_xid = toku_xids_get_innermost_xid(xids); invariant(this_xid!=TXNID_NONE); UXR innermost = ule_get_innermost_uxr(ule); // need to check for provisional entries in ule, otherwise @@ -1636,15 +1814,34 @@ ule_apply_abort(ULE ule, XIDS xids) { // in a bug where the most recently committed has same xid // as the XID's innermost if (ule->num_puxrs > 0 && innermost->xid == this_xid) { + // if this is a rollback of a delete of a new ule, return 0 + // (i.e. double delete) + if (uxr_is_delete(innermost)) { + if (ule->num_puxrs == 1 && ule->num_cuxrs == 1 && + uxr_is_delete(&(ule->uxrs[0]))) { + retval = 0; + } else { + retval = 1; + } + } else if (uxr_is_insert(innermost)) { + if (ule->num_puxrs == 1 && ule->num_cuxrs == 1 && + uxr_is_insert(&(ule->uxrs[0]))) { + retval = 0; + } else { + retval = -1; + } + } + // if this is a rollback of a insert of an exising ule, return 0 + // (i.e. double insert) invariant(ule->num_puxrs>0); ule_remove_innermost_uxr(ule); ule_remove_innermost_placeholders(ule); } invariant(ule->num_cuxrs > 0); + return retval; } -static void -ule_apply_broadcast_commit_all (ULE ule) { +static void ule_apply_broadcast_commit_all (ULE ule) { ule->uxrs[0] = ule->uxrs[ule->num_puxrs + ule->num_cuxrs - 1]; ule->uxrs[0].xid = TXNID_NONE; ule->num_puxrs = 0; @@ -1657,9 +1854,11 @@ ule_apply_broadcast_commit_all (ULE ule) { // then there is nothing to be done. // Also, if there are no uncommitted transaction records there is nothing to do. // If this transaction did modify the leafentry, then promote whatever it did. -// Remember, the innermost uxr can only be an insert or a delete, not a placeholder. +// Remember, the innermost uxr can only be an insert or a delete, not a +// placeholder. void ule_apply_commit(ULE ule, XIDS xids) { - TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction committing + // xid of transaction committing + TXNID this_xid = toku_xids_get_innermost_xid(xids); invariant(this_xid!=TXNID_NONE); // need to check for provisional entries in ule, otherwise // there is nothing to abort, not checking this may result @@ -1668,16 +1867,19 @@ void ule_apply_commit(ULE ule, XIDS xids) { if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) { // 3 cases: //1- it's already a committed value (do nothing) (num_puxrs==0) - //2- it's provisional but root level (make a new committed value (num_puxrs==1) + //2- it's provisional but root level (make a new committed value + // (num_puxrs==1) //3- it's provisional and not root (promote); (num_puxrs>1) if (ule->num_puxrs == 1) { //new committed value ule_promote_provisional_innermost_to_committed(ule); - } - else if (ule->num_puxrs > 1) { - //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-1] is the innermost (this transaction) + } else if (ule->num_puxrs > 1) { + //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-1] is the innermost + // (this transaction) //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-2] is the 2nd innermost //We want to promote the innermost uxr one level out. - ule_promote_provisional_innermost_to_index(ule, ule->num_cuxrs+ule->num_puxrs-2); + ule_promote_provisional_innermost_to_index( + ule, + ule->num_cuxrs+ule->num_puxrs-2); } } } @@ -1687,14 +1889,17 @@ void ule_apply_commit(ULE ule, XIDS xids) { // // Purpose is to record an insert for this transaction (and set type correctly). -static void -ule_push_insert_uxr(ULE ule, bool is_committed, TXNID xid, uint32_t vallen, void * valp) { - UXR uxr = ule_get_first_empty_uxr(ule); +static inline void ule_push_insert_uxr( + ULE ule, + bool is_committed, TXNID xid, + uint32_t vallen, + void* valp) { + + UXR uxr = ule_get_first_empty_uxr(ule); if (is_committed) { invariant(ule->num_puxrs==0); ule->num_cuxrs++; - } - else { + } else { ule->num_puxrs++; } uxr->xid = xid; @@ -1706,23 +1911,21 @@ ule_push_insert_uxr(ULE ule, bool is_committed, TXNID xid, uint32_t vallen, void // Purpose is to record a delete for this transaction. If this transaction // is the root transaction, then truly delete the leafentry by marking the // ule as empty. -static void -ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid) { +static inline void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid) { UXR uxr = ule_get_first_empty_uxr(ule); if (is_committed) { invariant(ule->num_puxrs==0); ule->num_cuxrs++; - } - else { + } else { ule->num_puxrs++; } uxr->xid = xid; uxr->type = XR_DELETE; } -// Purpose is to push a placeholder on the top of the leafentry's transaction stack. -static void -ule_push_placeholder_uxr(ULE ule, TXNID xid) { +// Purpose is to push a placeholder on the top of the leafentry's transaction +// stack. +static inline void ule_push_placeholder_uxr(ULE ule, TXNID xid) { invariant(ule->num_cuxrs>0); UXR uxr = ule_get_first_empty_uxr(ule); uxr->xid = xid; @@ -1731,16 +1934,14 @@ ule_push_placeholder_uxr(ULE ule, TXNID xid) { } // Return innermost transaction record. -static UXR -ule_get_innermost_uxr(ULE ule) { +static inline UXR ule_get_innermost_uxr(ULE ule) { invariant(ule->num_cuxrs > 0); UXR rval = &(ule->uxrs[ule->num_cuxrs + ule->num_puxrs - 1]); return rval; } // Return first empty transaction record -static UXR -ule_get_first_empty_uxr(ULE ule) { +static inline UXR ule_get_first_empty_uxr(ULE ule) { invariant(ule->num_puxrs < MAX_TRANSACTION_RECORDS-1); UXR rval = &(ule->uxrs[ule->num_cuxrs+ule->num_puxrs]); return rval; @@ -1748,14 +1949,12 @@ ule_get_first_empty_uxr(ULE ule) { // Remove the innermost transaction (pop the leafentry's stack), undoing // whatever the innermost transaction did. -static void -ule_remove_innermost_uxr(ULE ule) { +static inline void ule_remove_innermost_uxr(ULE ule) { //It is possible to remove the committed delete at first insert. invariant(ule->num_cuxrs > 0); if (ule->num_puxrs) { ule->num_puxrs--; - } - else { + } else { //This is for LOADER_USE_PUTS or transactionless environment //where messages use XIDS of 0 invariant(ule->num_cuxrs == 1); @@ -1764,14 +1963,12 @@ ule_remove_innermost_uxr(ULE ule) { } } -static TXNID -ule_get_innermost_xid(ULE ule) { +static inline TXNID ule_get_innermost_xid(ULE ule) { TXNID rval = ule_get_xid(ule, ule->num_cuxrs + ule->num_puxrs - 1); return rval; } -static TXNID -ule_get_xid(ULE ule, uint32_t index) { +static inline TXNID ule_get_xid(ULE ule, uint32_t index) { invariant(index < ule->num_cuxrs + ule->num_puxrs); TXNID rval = ule->uxrs[index].xid; return rval; @@ -1781,8 +1978,7 @@ ule_get_xid(ULE ule, uint32_t index) { // innermost recorded transactions), if necessary. This function is idempotent. // It makes no logical sense for a placeholder to be the innermost recorded // transaction record, so placeholders at the top of the stack are not legal. -static void -ule_remove_innermost_placeholders(ULE ule) { +static void ule_remove_innermost_placeholders(ULE ule) { UXR uxr = ule_get_innermost_uxr(ule); while (uxr_is_placeholder(uxr)) { invariant(ule->num_puxrs>0); @@ -1796,8 +1992,7 @@ ule_remove_innermost_placeholders(ULE ule) { // Note, after placeholders are added, an insert or delete will be added. This // function temporarily leaves the transaction stack in an illegal state (having // placeholders on top). -static void -ule_add_placeholders(ULE ule, XIDS xids) { +static void ule_add_placeholders(ULE ule, XIDS xids) { //Placeholders can be placed on top of the committed uxr. invariant(ule->num_cuxrs > 0); @@ -1819,47 +2014,40 @@ ule_add_placeholders(ULE ule, XIDS xids) { } } -uint64_t -ule_num_uxrs(ULE ule) { +uint64_t ule_num_uxrs(ULE ule) { return ule->num_cuxrs + ule->num_puxrs; } -UXR -ule_get_uxr(ULE ule, uint64_t ith) { +UXR ule_get_uxr(ULE ule, uint64_t ith) { invariant(ith < ule_num_uxrs(ule)); return &ule->uxrs[ith]; } -uint32_t -ule_get_num_committed(ULE ule) { +uint32_t ule_get_num_committed(ULE ule) { return ule->num_cuxrs; } -uint32_t -ule_get_num_provisional(ULE ule) { +uint32_t ule_get_num_provisional(ULE ule) { return ule->num_puxrs; } -int -ule_is_committed(ULE ule, uint64_t ith) { +int ule_is_committed(ULE ule, uint64_t ith) { invariant(ith < ule_num_uxrs(ule)); return ith < ule->num_cuxrs; } -int -ule_is_provisional(ULE ule, uint64_t ith) { +int ule_is_provisional(ULE ule, uint64_t ith) { invariant(ith < ule_num_uxrs(ule)); return ith >= ule->num_cuxrs; } // return size of data for innermost uxr, the size of val -uint32_t -ule_get_innermost_numbytes(ULE ule, uint32_t keylen) { +uint32_t ule_get_innermost_numbytes(ULE ule, uint32_t keylen) { uint32_t rval; UXR uxr = ule_get_innermost_uxr(ule); - if (uxr_is_delete(uxr)) + if (uxr_is_delete(uxr)) { rval = 0; - else { + } else { rval = uxr_get_vallen(uxr) + keylen; } return rval; @@ -1870,68 +2058,65 @@ ule_get_innermost_numbytes(ULE ule, uint32_t keylen) { // This layer of abstraction (uxr_xxx) understands uxr and nothing else. // -static inline bool -uxr_type_is_insert(uint8_t type) { +static inline bool uxr_type_is_insert(uint8_t type) { bool rval = (bool)(type == XR_INSERT); return rval; } -bool -uxr_is_insert(UXR uxr) { +bool uxr_is_insert(UXR uxr) { return uxr_type_is_insert(uxr->type); } -static inline bool -uxr_type_is_delete(uint8_t type) { +static inline bool uxr_type_is_delete(uint8_t type) { bool rval = (bool)(type == XR_DELETE); return rval; } -bool -uxr_is_delete(UXR uxr) { +bool uxr_is_delete(UXR uxr) { return uxr_type_is_delete(uxr->type); } -static inline bool -uxr_type_is_placeholder(uint8_t type) { +static inline bool uxr_type_is_placeholder(uint8_t type) { bool rval = (bool)(type == XR_PLACEHOLDER); return rval; } -bool -uxr_is_placeholder(UXR uxr) { +bool uxr_is_placeholder(UXR uxr) { return uxr_type_is_placeholder(uxr->type); } -void * -uxr_get_val(UXR uxr) { +void* uxr_get_val(UXR uxr) { return uxr->valp; } -uint32_t -uxr_get_vallen(UXR uxr) { +uint32_t uxr_get_vallen(UXR uxr) { return uxr->vallen; } -TXNID -uxr_get_txnid(UXR uxr) { +TXNID uxr_get_txnid(UXR uxr) { return uxr->xid; } -static int -le_iterate_get_accepted_index(TXNID *xids, uint32_t *index, uint32_t num_xids, LE_ITERATE_CALLBACK f, TOKUTXN context, bool top_is_provisional) { +static int le_iterate_get_accepted_index( + TXNID* xids, + uint32_t* index, + uint32_t num_xids, + LE_ITERATE_CALLBACK f, + TOKUTXN context, + bool top_is_provisional) { + uint32_t i; int r = 0; - // if this for loop does not return anything, we return num_xids-1, which should map to T_0 + // if this for loop does not return anything, we return num_xids-1, which + // should map to T_0 for (i = 0; i < num_xids - 1; i++) { TXNID xid = toku_dtoh64(xids[i]); r = f(xid, context, (i == 0 && top_is_provisional)); if (r==TOKUDB_ACCEPT) { r = 0; break; //or goto something - } - else if (r!=0) { + } else if (r!=0) { break; } } @@ -1940,8 +2125,7 @@ le_iterate_get_accepted_index(TXNID *xids, uint32_t *index, uint32_t num_xids, L } #if ULE_DEBUG -static void -ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) { +static void ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) { int has_p = (ule->num_puxrs != 0); invariant(ule->num_cuxrs + has_p == interesting); uint32_t i; @@ -1953,21 +2137,29 @@ ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) { #endif // -// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted by 'f'. If the value -// associated with the accepted TXNID is not an insert, then set *is_emptyp to true, otherwise false +// Iterates over "possible" TXNIDs in a leafentry's stack, until one is +// accepted by 'f'. If the value associated with the accepted TXNID is not an +// insert, then set *is_emptyp to true, otherwise false // The "possible" TXNIDs are: -// if provisionals exist, then the first possible TXNID is the outermost provisional. -// The next possible TXNIDs are the committed TXNIDs, from most recently committed to T_0. -// If provisionals exist, and the outermost provisional is accepted by 'f', +// If provisionals exist, then the first possible TXNID is the outermost +// provisional. +// The next possible TXNIDs are the committed TXNIDs, from most recently +// committed to T_0. +// If provisionals exist, and the outermost provisional is accepted by 'f', // the associated value checked is the innermost provisional's value. // Parameters: // le - leafentry to iterate over -// f - callback function that checks if a TXNID in le is accepted, and its associated value should be examined. +// f - callback function that checks if a TXNID in le is accepted, and its +// associated value should be examined. // is_delp - output parameter that returns answer // context - parameter for f // -static int -le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN context) { +static int le_iterate_is_del( + LEAFENTRY le, + LE_ITERATE_CALLBACK f, + bool* is_delp, + TOKUTXN context) { + #if ULE_DEBUG ULE_S ule; le_unpack(&ule, le); @@ -2002,8 +2194,17 @@ le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN co #if ULE_DEBUG ule_verify_xids(&ule, num_interesting, xids); #endif - r = le_iterate_get_accepted_index(xids, &index, num_interesting, f, context, (num_puxrs != 0)); - if (r!=0) goto cleanup; + r = + le_iterate_get_accepted_index( + xids, + &index, + num_interesting, + f, + context, + (num_puxrs != 0)); + if (r != 0) { + goto cleanup; + } invariant(index < num_interesting); //Skip TXNIDs @@ -2017,7 +2218,9 @@ le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN co #if ULE_DEBUG { uint32_t has_p = (ule.num_puxrs != 0); - uint32_t ule_index = (index==0) ? ule.num_cuxrs + ule.num_puxrs - 1 : ule.num_cuxrs - 1 + has_p - index; + uint32_t ule_index = (index==0) ? + ule.num_cuxrs + ule.num_puxrs - 1 : + ule.num_cuxrs - 1 + has_p - index; UXR uxr = ule.uxrs + ule_index; invariant(uxr_is_delete(uxr) == is_del); } @@ -2034,7 +2237,11 @@ cleanup: return r; } -static int le_iterate_read_committed_callback(TXNID txnid, TOKUTXN txn, bool is_provisional UU()) { +static int le_iterate_read_committed_callback( + TXNID txnid, + TOKUTXN txn, + bool is_provisional UU()) { + if (is_provisional) { return toku_txn_reads_txnid(txnid, txn, is_provisional); } @@ -2058,33 +2265,40 @@ int le_val_is_del(LEAFENTRY le, enum cursor_read_type read_type, TOKUTXN txn) { txn ); rval = is_del; - } - else if (read_type == C_READ_ANY) { + } else if (read_type == C_READ_ANY) { rval = le_latest_is_del(le); - } - else { + } else { invariant(false); } return rval; } // -// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted by 'f'. Set -// valpp and vallenp to value and length associated with accepted TXNID +// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted +// by 'f'. Set valpp and vallenp to value and length associated with accepted +// TXNID // The "possible" TXNIDs are: -// if provisionals exist, then the first possible TXNID is the outermost provisional. -// The next possible TXNIDs are the committed TXNIDs, from most recently committed to T_0. -// If provisionals exist, and the outermost provisional is accepted by 'f', +// If provisionals exist, then the first possible TXNID is the outermost +// provisional. +// The next possible TXNIDs are the committed TXNIDs, from most recently +// committed to T_0. +// If provisionals exist, and the outermost provisional is accepted by 'f', // the associated length value is the innermost provisional's length and value. // Parameters: // le - leafentry to iterate over -// f - callback function that checks if a TXNID in le is accepted, and its associated value should be examined. +// f - callback function that checks if a TXNID in le is accepted, and its +// associated value should be examined. // valpp - output parameter that returns pointer to value // vallenp - output parameter that returns length of value // context - parameter for f // -int -le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context) { +int le_iterate_val( + LEAFENTRY le, + LE_ITERATE_CALLBACK f, + void** valpp, + uint32_t* vallenp, + TOKUTXN context) { + #if ULE_DEBUG ULE_S ule; le_unpack(&ule, le); @@ -2124,8 +2338,17 @@ le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vall #if ULE_DEBUG ule_verify_xids(&ule, num_interesting, xids); #endif - r = le_iterate_get_accepted_index(xids, &index, num_interesting, f, context, (num_puxrs != 0)); - if (r!=0) goto cleanup; + r = + le_iterate_get_accepted_index( + xids, + &index, + num_interesting, + f, + context, + (num_puxrs != 0)); + if (r != 0) { + goto cleanup; + } invariant(index < num_interesting); //Skip TXNIDs @@ -2158,7 +2381,9 @@ le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vall #if ULE_DEBUG { uint32_t has_p = (ule.num_puxrs != 0); - uint32_t ule_index = (index==0) ? ule.num_cuxrs + ule.num_puxrs - 1 : ule.num_cuxrs - 1 + has_p - index; + uint32_t ule_index = (index==0) ? + ule.num_cuxrs + ule.num_puxrs - 1 : + ule.num_cuxrs - 1 + has_p - index; UXR uxr = ule.uxrs + ule_index; invariant(uxr_is_insert(uxr)); invariant(uxr->vallen == vallen); @@ -2188,10 +2413,15 @@ cleanup: return r; } -void le_extract_val(LEAFENTRY le, - // should we return the entire leafentry as the val? - bool is_leaf_mode, enum cursor_read_type read_type, - TOKUTXN ttxn, uint32_t *vallen, void **val) { +void le_extract_val( + LEAFENTRY le, + // should we return the entire leafentry as the val? + bool is_leaf_mode, + enum cursor_read_type read_type, + TOKUTXN ttxn, + uint32_t* vallen, + void** val) { + if (is_leaf_mode) { *val = le; *vallen = leafentry_memsize(le); @@ -2199,18 +2429,11 @@ void le_extract_val(LEAFENTRY le, LE_ITERATE_CALLBACK f = (read_type == C_READ_SNAPSHOT) ? toku_txn_reads_txnid : le_iterate_read_committed_callback; - int r = le_iterate_val( - le, - f, - val, - vallen, - ttxn - ); + int r = le_iterate_val(le, f, val, vallen, ttxn); lazy_assert_zero(r); } else if (read_type == C_READ_ANY){ *val = le_latest_val_and_len(le, vallen); - } - else { + } else { assert(false); } } @@ -2244,9 +2467,9 @@ static_assert(18 == sizeof(leafentry_13), "wrong size"); static_assert(9 == __builtin_offsetof(leafentry_13, u), "wrong offset"); //Requires: -// Leafentry that ule represents should not be destroyed (is not just all deletes) -static size_t -le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) { +// Leafentry that ule represents should not be destroyed (is not just all +// deletes) +static size_t le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) { uint32_t num_uxrs = ule->num_cuxrs + ule->num_puxrs; assert(num_uxrs); size_t rval; @@ -2257,8 +2480,7 @@ le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) { +4 //vallen +le->keylen //actual key +ule->uxrs[0].vallen; //actual val - } - else { + } else { rval = 1 //num_uxrs +4 //keylen +le->keylen //actual key @@ -2276,16 +2498,20 @@ le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) { return rval; } -//This function is mostly copied from 4.1.1 (which is version 12, same as 13 except that only 13 is upgradable). -// Note, number of transaction records in version 13 has been replaced by separate counters in version 14 (MVCC), -// one counter for committed transaction records and one counter for provisional transaction records. When -// upgrading a version 13 le to version 14, the number of committed transaction records is always set to one (1) -// and the number of provisional transaction records is set to the original number of transaction records -// minus one. The bottom transaction record is assumed to be a committed value. (If there is no committed -// value then the bottom transaction record of version 13 is a committed delete.) -// This is the only change from the 4.1.1 code. The rest of the leafentry is read as is. -static void -le_unpack_13(ULE ule, LEAFENTRY_13 le) { +// This function is mostly copied from 4.1.1 (which is version 12, same as 13 +// except that only 13 is upgradable). +// Note, number of transaction records in version 13 has been replaced by +// separate counters in version 14 (MVCC), one counter for committed transaction +// records and one counter for provisional transaction records. When upgrading +// a version 13 le to version 14, the number of committed transaction records is +// always set to one (1) and the number of provisional transaction records is +// set to the original number of transaction records minus one. The bottom +// transaction record is assumed to be a committed value. (If there is no +// committed value then the bottom transaction record of version 13 is a +// committed delete.) +// This is the only change from the 4.1.1 code. The rest of the leafentry is +// read as is. +static void le_unpack_13(ULE ule, LEAFENTRY_13 le) { //Read num_uxrs uint8_t num_xrs = le->num_xrs; assert(num_xrs > 0); @@ -2302,15 +2528,15 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) { uint8_t *p; if (num_xrs == 1) { //Unpack a 'committed leafentry' (No uncommitted transactions exist) - ule->uxrs[0].type = XR_INSERT; //Must be or the leafentry would not exist + //Must be or the leafentry would not exist + ule->uxrs[0].type = XR_INSERT; ule->uxrs[0].vallen = vallen_of_innermost_insert; ule->uxrs[0].valp = &le->u.comm.key_val[keylen]; ule->uxrs[0].xid = 0; //Required. //Set p to immediately after leafentry p = &le->u.comm.key_val[keylen + vallen_of_innermost_insert]; - } - else { + } else { //Unpack a 'provisional leafentry' (Uncommitted transactions exist) //Read in type. @@ -2337,8 +2563,7 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) { //Not innermost, so load the type. uxr->type = *p; p += 1; - } - else { + } else { //Innermost, load the type previously read from header uxr->type = innermost_type; } @@ -2349,12 +2574,11 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) { //Not committed nor outermost uncommitted, so load the xid. uxr->xid = toku_dtoh64(*(TXNID*)p); p += 8; - } - else if (i == 1) { - //Outermost uncommitted, load the xid previously read from header + } else if (i == 1) { + //Outermost uncommitted, load the xid previously read from + //header uxr->xid = xid_outermost_uncommitted; - } - else { + } else { // i == 0, committed entry uxr->xid = 0; } @@ -2367,9 +2591,9 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) { uxr->valp = p; p += uxr->vallen; - } - else { - //Innermost insert, load the vallen/valp previously read from header + } else { + //Innermost insert, load the vallen/valp previously read + //from header uxr->vallen = vallen_of_innermost_insert; uxr->valp = valp_of_innermost_insert; found_innermost_insert = true; @@ -2384,8 +2608,7 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) { #endif } -size_t -leafentry_disksize_13(LEAFENTRY_13 le) { +size_t leafentry_disksize_13(LEAFENTRY_13 le) { ULE_S ule; le_unpack_13(&ule, le); size_t memsize = le_memsize_from_ule_13(&ule, le); @@ -2393,13 +2616,13 @@ leafentry_disksize_13(LEAFENTRY_13 le) { return memsize; } -int -toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, - void** keyp, - uint32_t* keylen, - size_t *new_leafentry_memorysize, - LEAFENTRY *new_leafentry_p - ) { +int toku_le_upgrade_13_14( + LEAFENTRY_13 old_leafentry, + void** keyp, + uint32_t* keylen, + size_t* new_leafentry_memorysize, + LEAFENTRY* new_leafentry_p) { + ULE_S ule; int rval; invariant(old_leafentry); @@ -2408,23 +2631,23 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, *keylen = old_leafentry->keylen; if (old_leafentry->num_xrs == 1) { *keyp = old_leafentry->u.comm.key_val; - } - else { + } else { *keyp = old_leafentry->u.prov.key_val_xrs; } // We used to pass NULL for omt and mempool, so that we would use // malloc instead of a mempool. However after supporting upgrade, // we need to use mempools and the OMT. - rval = le_pack(&ule, // create packed leafentry - nullptr, - 0, //only matters if we are passing in a bn_data - nullptr, //only matters if we are passing in a bn_data - 0, //only matters if we are passing in a bn_data - 0, //only matters if we are passing in a bn_data - 0, //only matters if we are passing in a bn_data - new_leafentry_p, - nullptr //only matters if we are passing in a bn_data - ); + rval = + le_pack( + &ule, // create packed leafentry + nullptr, + 0, //only matters if we are passing in a bn_data + nullptr, //only matters if we are passing in a bn_data + 0, //only matters if we are passing in a bn_data + 0, //only matters if we are passing in a bn_data + 0, //only matters if we are passing in a bn_data + new_leafentry_p, + nullptr); //only matters if we are passing in a bn_data ule_cleanup(&ule); *new_leafentry_memorysize = leafentry_memsize(*new_leafentry_p); return rval; diff --git a/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt b/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt index 8cea16c914d..6f9146ce5b2 100644 --- a/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt +++ b/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt @@ -2,6 +2,8 @@ include_directories(..) include_directories(../../src) include_directories(../../src/tests) +find_library(JEMALLOC_STATIC_LIBRARY libjemalloc.a) + if (BUILD_TESTING) ## reference implementation with simple size-doubling buffer without ## jemalloc size tricks @@ -24,15 +26,15 @@ if (BUILD_TESTING) cursor_test ) set(_testname ${impl}_${test}) - if (with_jemalloc) + if (with_jemalloc AND JEMALLOC_STATIC_LIBRARY) set(_testname ${_testname}_j) endif () add_executable(${_testname} ${test}) - if (with_jemalloc) + if (with_jemalloc AND JEMALLOC_STATIC_LIBRARY) if (APPLE) - target_link_libraries(${_testname} -Wl,-force_load jemalloc) + target_link_libraries(${_testname} -Wl,-force_load ${JEMALLOC_STATIC_LIBRARY}) else () - target_link_libraries(${_testname} -Wl,--whole-archive jemalloc -Wl,--no-whole-archive) + target_link_libraries(${_testname} -Wl,--whole-archive ${JEMALLOC_STATIC_LIBRARY} -Wl,--no-whole-archive) endif () endif () target_link_libraries(${_testname} ${impl}) diff --git a/storage/tokudb/PerconaFT/portability/toku_pthread.h b/storage/tokudb/PerconaFT/portability/toku_pthread.h index 25cf48dfd8c..84c27736201 100644 --- a/storage/tokudb/PerconaFT/portability/toku_pthread.h +++ b/storage/tokudb/PerconaFT/portability/toku_pthread.h @@ -72,15 +72,18 @@ typedef struct toku_mutex_aligned { toku_mutex_t aligned_mutex __attribute__((__aligned__(64))); } toku_mutex_aligned_t; -// Different OSes implement mutexes as different amounts of nested structs. -// C++ will fill out all missing values with zeroes if you provide at least one zero, but it needs the right amount of nesting. -#if defined(__FreeBSD__) -# define ZERO_MUTEX_INITIALIZER {0} -#elif defined(__APPLE__) -# define ZERO_MUTEX_INITIALIZER {{0}} -#else // __linux__, at least -# define ZERO_MUTEX_INITIALIZER {{{0}}} -#endif +// Initializing with {} will fill in a struct with all zeros. +// But you may also need a pragma to suppress the warnings, as follows +// +// #pragma GCC diagnostic push +// #pragma GCC diagnostic ignored "-Wmissing-field-initializers" +// toku_mutex_t foo = ZERO_MUTEX_INITIALIZER; +// #pragma GCC diagnostic pop +// +// In general it will be a lot of busy work to make this codebase compile +// cleanly with -Wmissing-field-initializers + +# define ZERO_MUTEX_INITIALIZER {} #if TOKU_PTHREAD_DEBUG # define TOKU_MUTEX_INITIALIZER { .pmutex = PTHREAD_MUTEX_INITIALIZER, .owner = 0, .locked = false, .valid = true } @@ -223,15 +226,9 @@ typedef struct toku_cond { pthread_cond_t pcond; } toku_cond_t; -// Different OSes implement mutexes as different amounts of nested structs. -// C++ will fill out all missing values with zeroes if you provide at least one zero, but it needs the right amount of nesting. -#if defined(__FreeBSD__) -# define ZERO_COND_INITIALIZER {0} -#elif defined(__APPLE__) -# define ZERO_COND_INITIALIZER {{0}} -#else // __linux__, at least -# define ZERO_COND_INITIALIZER {{{0}}} -#endif +// Same considerations as for ZERO_MUTEX_INITIALIZER apply +#define ZERO_COND_INITIALIZER {} + #define TOKU_COND_INITIALIZER {PTHREAD_COND_INITIALIZER} static inline void diff --git a/storage/tokudb/PerconaFT/portability/toku_time.h b/storage/tokudb/PerconaFT/portability/toku_time.h index c476b64a212..11a3f3aa2b9 100644 --- a/storage/tokudb/PerconaFT/portability/toku_time.h +++ b/storage/tokudb/PerconaFT/portability/toku_time.h @@ -108,3 +108,13 @@ static inline uint64_t toku_current_time_microsec(void) { gettimeofday(&t, NULL); return t.tv_sec * (1UL * 1000 * 1000) + t.tv_usec; } + +// sleep microseconds +static inline void toku_sleep_microsec(uint64_t ms) { + struct timeval t; + + t.tv_sec = ms / 1000000; + t.tv_usec = ms % 1000000; + + select(0, NULL, NULL, NULL, &t); +} diff --git a/storage/tokudb/PerconaFT/scripts/run.stress-tests.py b/storage/tokudb/PerconaFT/scripts/run.stress-tests.py index a8df83a3b55..e983fe8ccd9 100644 --- a/storage/tokudb/PerconaFT/scripts/run.stress-tests.py +++ b/storage/tokudb/PerconaFT/scripts/run.stress-tests.py @@ -521,14 +521,16 @@ Test output: })) def send_mail(toaddrs, subject, body): - m = MIMEText(body) - fromaddr = 'tim@tokutek.com' - m['From'] = fromaddr - m['To'] = ', '.join(toaddrs) - m['Subject'] = subject - s = SMTP('192.168.1.114') - s.sendmail(fromaddr, toaddrs, str(m)) - s.quit() + # m = MIMEText(body) + # fromaddr = 'dev-private@percona.com' + # m['From'] = fromaddr + # m['To'] = ', '.join(toaddrs) + # m['Subject'] = subject + # s = SMTP('192.168.1.114') + # s.sendmail(fromaddr, toaddrs, str(m)) + # s.quit() + info(subject); + info(body); def update(tokudb): info('Updating from git.') @@ -554,12 +556,12 @@ def rebuild(tokudb, builddir, tokudb_data, cc, cxx, tests): env=newenv, cwd=builddir) if r != 0: - send_mail(['leif@tokutek.com'], 'Stress tests on %s failed to build.' % gethostname(), '') + send_mail(['dev-private@percona.com'], 'Stress tests on %s failed to build.' % gethostname(), '') error('Building the tests failed.') sys.exit(r) r = call(['make', '-j8'], cwd=builddir) if r != 0: - send_mail(['leif@tokutek.com'], 'Stress tests on %s failed to build.' % gethostname(), '') + send_mail(['dev-private@percona.com'], 'Stress tests on %s failed to build.' % gethostname(), '') error('Building the tests failed.') sys.exit(r) @@ -671,7 +673,7 @@ def main(opts): sys.exit(0) except Exception, e: exception('Unhandled exception caught in main.') - send_mail(['leif@tokutek.com'], 'Stress tests caught unhandled exception in main, on %s' % gethostname(), format_exc()) + send_mail(['dev-private@percona.com'], 'Stress tests caught unhandled exception in main, on %s' % gethostname(), format_exc()) raise e if __name__ == '__main__': @@ -786,7 +788,7 @@ if __name__ == '__main__': if not opts.send_emails: opts.email = None elif len(opts.email) == 0: - opts.email.append('tokueng@tokutek.com') + opts.email.append('dev-private@percona.com') if opts.debug: logging.basicConfig(level=logging.DEBUG) diff --git a/storage/tokudb/PerconaFT/src/export.map b/storage/tokudb/PerconaFT/src/export.map index 3f2c7569ea4..fc2be5f41a5 100644 --- a/storage/tokudb/PerconaFT/src/export.map +++ b/storage/tokudb/PerconaFT/src/export.map @@ -82,6 +82,7 @@ toku_test_db_redirect_dictionary; toku_test_get_latest_lsn; toku_test_get_checkpointing_user_data_status; + toku_set_test_txn_sync_callback; toku_indexer_set_test_only_flags; toku_increase_last_xid; diff --git a/storage/tokudb/PerconaFT/src/indexer-undo-do.cc b/storage/tokudb/PerconaFT/src/indexer-undo-do.cc index b93429407eb..8d0b080b9fe 100644 --- a/storage/tokudb/PerconaFT/src/indexer-undo-do.cc +++ b/storage/tokudb/PerconaFT/src/indexer-undo-do.cc @@ -313,7 +313,7 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info break; if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) { - // if the outermost is not live, then the inner state must be retired. thats the way that the txn API works. + // If the outermost is not live, then the inner state must be retired. That's the way that the txn API works. assert(this_xid_state == TOKUTXN_RETIRED); } diff --git a/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt b/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt index 70977a9dfda..47f6aa44a75 100644 --- a/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt +++ b/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt @@ -53,7 +53,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) target_link_libraries(test-5138.tdb ${LIBTOKUDB}_static z ${LIBTOKUPORTABILITY}_static ${CMAKE_THREAD_LIBS_INIT} ${EXTRA_SYSTEM_LIBS}) add_space_separated_property(TARGET test-5138.tdb COMPILE_FLAGS -fvisibility=hidden) add_ydb_test(test-5138.tdb) - + add_ydb_test(rollback-inconsistency.tdb) foreach(bin ${tdb_bins}) get_filename_component(base ${bin} NAME_WE) diff --git a/storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc b/storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc new file mode 100644 index 00000000000..f8099c7a639 --- /dev/null +++ b/storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc @@ -0,0 +1,161 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "test.h" + +// insert enough rows with a child txn and then force an eviction to verify the rollback +// log node is in valid state. +// the test fails without the fix (and of course passes with it). +// The test basically simulates the test script of George's. + + +static void +populate_table(int start, int end, DB_TXN * parent, DB_ENV * env, DB * db) { + DB_TXN *txn = NULL; + int r = env->txn_begin(env, parent, &txn, 0); assert_zero(r); + for (int i = start; i < end; i++) { + int k = htonl(i); + char kk[4]; + char str[220]; + memset(kk, 0, sizeof kk); + memcpy(kk, &k, sizeof k); + memset(str,'a', sizeof str); + DBT key = { .data = kk, .size = sizeof kk }; + DBT val = { .data = str, .size = sizeof str }; + r = db->put(db, txn, &key, &val, 0); + assert_zero(r); + } + r = txn->commit(txn, 0); + assert_zero(r); +} + +static void +populate_and_test(DB_ENV *env, DB *db) { + int r; + DB_TXN *parent = NULL; + r = env->txn_begin(env, NULL, &parent, 0); assert_zero(r); + + populate_table(0, 128, parent, env, db); + + //we know the eviction is going to happen here and the log node of parent txn is going to be evicted + //due to the extremely low cachesize. + populate_table(128, 256, parent, env, db); + + //again eviction due to the memory pressure. 256 rows is the point when that rollback log spills out. The spilled node + //will be written back but will not be dirtied by including rollback nodes from child txn(in which case the bug is bypassed). + populate_table(256, 512, parent, env, db); + + r = parent->abort(parent); assert_zero(r); + + //try to search anything in the lost range + int k = htonl(200); + char kk[4]; + memset(kk, 0, sizeof kk); + memcpy(kk, &k, sizeof k); + DBT key = { .data = kk, .size = sizeof kk }; + DBT val; + r = db->get(db, NULL, &key, &val, 0); + assert(r==DB_NOTFOUND); + +} + +static void +run_test(void) { + int r; + DB_ENV *env = NULL; + r = db_env_create(&env, 0); + assert_zero(r); + env->set_errfile(env, stderr); + + //setting up the cachetable size 64k + uint32_t cachesize = 64*1024; + r = env->set_cachesize(env, 0, cachesize, 1); + assert_zero(r); + + //setting up the log write block size to 4k so the rollback log nodes spill in accordance with the node size + r = env->set_lg_bsize(env, 4096); + assert_zero(r); + + r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); + assert_zero(r); + + DB *db = NULL; + r = db_create(&db, env, 0); + assert_zero(r); + + r = db->set_pagesize(db, 4096); + assert_zero(r); + + r = db->set_readpagesize(db, 1024); + assert_zero(r); + + r = db->open(db, NULL, "test.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); + assert_zero(r); + + populate_and_test(env, db); + + r = db->close(db, 0); assert_zero(r); + + r = env->close(env, 0); assert_zero(r); +} + +int +test_main(int argc, char * const argv[]) { + int r; + + // parse_args(argc, argv); + for (int i = 1; i < argc; i++) { + char * const arg = argv[i]; + if (strcmp(arg, "-v") == 0) { + verbose++; + continue; + } + if (strcmp(arg, "-q") == 0) { + verbose = 0; + continue; + } + } + + toku_os_recursive_delete(TOKU_TEST_FILENAME); + r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r); + + run_test(); + + return 0; +} + diff --git a/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc b/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc index 0c70b0669ad..a2b48e443cd 100644 --- a/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc +++ b/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc @@ -166,7 +166,7 @@ run_test (void) { DB_BTREE_STAT64 s; r = db->stat64(db, NULL, &s); CKERR(r); - assert(s.bt_nkeys == 1 && s.bt_dsize == sizeof key + sizeof val); + assert(s.bt_nkeys == 0); r = db->close(db, 0); CKERR(r); @@ -176,7 +176,7 @@ run_test (void) { r = txn->commit(txn, 0); CKERR(r); r = db->stat64(db, NULL, &s); CKERR(r); - assert(s.bt_nkeys == 1 && s.bt_dsize == sizeof key + sizeof val); + assert(s.bt_nkeys == 0); } // verify update callback overwrites the row diff --git a/storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc b/storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc new file mode 100644 index 00000000000..c440bdc59e7 --- /dev/null +++ b/storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc @@ -0,0 +1,523 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include "test.h" +#include <stdio.h> + +#include <sys/stat.h> +#include <db.h> + +// Tests that the logical row counts are correct and not subject to variance +// due to normal insert/delete messages within the tree with the few exceptions +// of 1) rollback messages not yet applied; 2) inserts messages turned to +// updates on apply; and 3) missing leafentries on delete messages on apply. + +static DB_TXN* const null_txn = 0; +static const uint64_t num_records = 4*1024; + +#define CHECK_NUM_ROWS(_expected, _stats) assert(_stats.bt_ndata == _expected) + +static DB* create_db(const char* fname, DB_ENV* env) { + int r; + DB* db; + + r = db_create(&db, env, 0); + assert(r == 0); + db->set_errfile(db, stderr); + + r = db->set_pagesize(db, 8192); + assert(r == 0); + + r = db->set_readpagesize(db, 1024); + assert(r == 0); + + r = db->set_fanout(db, 4); + assert(r == 0); + + r = db->set_compression_method(db, TOKU_NO_COMPRESSION); + assert(r == 0); + + r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, + 0666); + assert(r == 0); + + return db; +} +static void add_records(DB* db, DB_TXN* txn, uint64_t start_id, uint64_t num) { + int r; + for (uint64_t i = 0, j=start_id; i < num; i++,j++) { + char key[100], val[256]; + DBT k,v; + snprintf(key, 100, "%08" PRIu64, j); + snprintf(val, 256, "%*s", 200, key); + r = + db->put( + db, + txn, + dbt_init(&k, key, 1+strlen(key)), + dbt_init(&v, val, 1+strlen(val)), + 0); + assert(r == 0); + } +} +static void delete_records( + DB* db, + DB_TXN* txn, + uint64_t start_id, + uint64_t num) { + + int r; + for (uint64_t i = 0, j=start_id; i < num; i++,j++) { + char key[100]; + DBT k; + snprintf(key, 100, "%08" PRIu64, j); + r = + db->del( + db, + txn, + dbt_init(&k, key, 1+strlen(key)), + 0); + assert(r == 0); + } +} +static void full_optimize(DB* db) { + int r; + uint64_t loops_run = 0; + + r = db->optimize(db); + assert(r == 0); + + r = db->hot_optimize(db, NULL, NULL, NULL, NULL, &loops_run); + assert(r == 0); +} +static void test_insert_commit(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf("%s : before commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + r = txn->commit(txn, 0); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + db->close(db, 0); +} +static void test_insert_delete_commit(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf("%s : before delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + delete_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + r = txn->commit(txn, 0); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + db->close(db, 0); +} +static void test_insert_commit_delete_commit(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf( + "%s : before insert commit %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + r = txn->commit(txn, 0); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf( + "%s : after insert commit %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + delete_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + r = txn->commit(txn, 0); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf( + "%s : after delete commit %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + db->close(db, 0); +} +static void test_insert_rollback(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf("%s : before rollback %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + r = txn->abort(txn); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + // CAN NOT TEST stats HERE AS THEY ARE SOMEWHAT NON_DETERMINISTIC UNTIL + // optimize + hot_optimize HAVE BEEN RUN DUE TO THE FACT THAT ROLLBACK + // MESSAGES ARE "IN-FLIGHT" IN THE TREE AND MUST BE APPLIED IN ORDER TO + // CORRECT THE RUNNING LOGICAL COUNT + if (verbose) + printf("%s : after rollback %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + full_optimize(db); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf( + "%s : after rollback optimize %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + db->close(db, 0); +} +static void test_insert_delete_rollback(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf("%s : before delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + delete_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + r = txn->abort(txn); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + db->close(db, 0); +} +static void test_insert_commit_delete_rollback(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf( + "%s : before insert commit %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + r = txn->commit(txn, 0); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf( + "%s : after insert commit %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + delete_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(0, stats); + if (verbose) + printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + r = txn->abort(txn); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + // CAN NOT TEST stats HERE AS THEY ARE SOMEWHAT NON_DETERMINISTIC UNTIL + // optimize + hot_optimize HAVE BEEN RUN DUE TO THE FACT THAT ROLLBACK + // MESSAGES ARE "IN-FLIGHT" IN THE TREE AND MUST BE APPLIED IN ORDER TO + // CORRECT THE RUNNING LOGICAL COUNT + if (verbose) + printf( + "%s : after delete rollback %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + full_optimize(db); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf( + "%s : after delete rollback optimize %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + db->close(db, 0); +} + +static int test_recount_insert_commit_progress( + uint64_t count, + uint64_t deleted, + void*) { + + if (verbose) + printf( + "%s : count[%" PRIu64 "] deleted[%" PRIu64 "]\n", + __FUNCTION__, + count, + deleted); + return 0; +} +static int test_recount_cancel_progress(uint64_t, uint64_t, void*) { + return 1; +} + +static void test_recount_insert_commit(DB_ENV* env) { + int r; + DB* db; + DB_TXN* txn; + DB_BTREE_STAT64 stats; + + db = create_db(__FUNCTION__, env); + + r = env->txn_begin(env, null_txn, &txn, 0); + assert(r == 0); + + add_records(db, txn, 0, num_records); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf( + "%s : before commit %" PRIu64 " rows\n", + __FUNCTION__, + stats.bt_ndata); + + r = txn->commit(txn, 0); + assert(r == 0); + + r = db->stat64(db, null_txn, &stats); + assert(r == 0); + + CHECK_NUM_ROWS(num_records, stats); + if (verbose) + printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata); + + // test that recount counted correct # of rows + r = db->recount_rows(db, test_recount_insert_commit_progress, NULL); + assert(r == 0); + CHECK_NUM_ROWS(num_records, stats); + + // test that recount callback cancel returns + r = db->recount_rows(db, test_recount_cancel_progress, NULL); + assert(r == 1); + CHECK_NUM_ROWS(num_records, stats); + + db->close(db, 0); +} +int test_main(int UU(argc), char UU(*const argv[])) { + int r; + DB_ENV* env; + + toku_os_recursive_delete(TOKU_TEST_FILENAME); + toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU + S_IRWXG + S_IRWXO); + + r = db_env_create(&env, 0); + assert(r == 0); + + r = + env->open( + env, + TOKU_TEST_FILENAME, + DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, + S_IRWXU + S_IRWXG + S_IRWXO); + assert(r == 0); + + test_insert_commit(env); + test_insert_delete_commit(env); + test_insert_commit_delete_commit(env); + test_insert_rollback(env); + test_insert_delete_rollback(env); + test_insert_commit_delete_rollback(env); + test_recount_insert_commit(env); + + r = env->close(env, 0); + assert(r == 0); + + return 0; +} diff --git a/storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc b/storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc new file mode 100644 index 00000000000..30cc16d73a7 --- /dev/null +++ b/storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc @@ -0,0 +1,217 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +//In response to the read-commit crash bug in the sysbench, this test is created to test +//the atomicity of the txn manager when handling the child txn snapshot. +//The test is supposed to fail before the read-commit-fix. + +#include "test.h" +#include "toku_pthread.h" +#include "ydb.h" +struct test_sync { + int state; + toku_mutex_t lock; + toku_cond_t cv; +}; + +static void test_sync_init(struct test_sync *UU(sync)) { +#if TOKU_DEBUG_TXN_SYNC + sync->state = 0; + toku_mutex_init(&sync->lock, NULL); + toku_cond_init(&sync->cv, NULL); +#endif +} + +static void test_sync_destroy(struct test_sync *UU(sync)) { +#if TOKU_DEBUG_TXN_SYNC + toku_mutex_destroy(&sync->lock); + toku_cond_destroy(&sync->cv); +#endif +} + +static void test_sync_sleep(struct test_sync *UU(sync), int UU(new_state)) { +#if TOKU_DEBUG_TXN_SYNC + toku_mutex_lock(&sync->lock); + while (sync->state != new_state) { + toku_cond_wait(&sync->cv, &sync->lock); + } + toku_mutex_unlock(&sync->lock); +#endif +} + +static void test_sync_next_state(struct test_sync *UU(sync)) { +#if TOKU_DEBUG_TXN_SYNC + toku_mutex_lock(&sync->lock); + sync->state++; + toku_cond_broadcast(&sync->cv); + toku_mutex_unlock(&sync->lock); +#endif +} + + +struct start_txn_arg { + DB_ENV *env; + DB *db; + DB_TXN * parent; +}; + +static struct test_sync sync_s; + +static void test_callback(pthread_t self_tid, void * extra) { + pthread_t **p = (pthread_t **) extra; + pthread_t tid_1 = *p[0]; + pthread_t tid_2 = *p[1]; + assert(pthread_equal(self_tid, tid_2)); + printf("%s: the thread[%" PRIu64 "] is going to wait...\n", __func__, reinterpret_cast<uint64_t>(tid_1)); + test_sync_next_state(&sync_s); + sleep(3); + //test_sync_sleep(&sync_s,3); + //using test_sync_sleep/test_sync_next_state pair can sync threads better, however + //after the fix, this might cause a deadlock. just simply use sleep to do a proof- + //of-concept test. + printf("%s: the thread[%" PRIu64 "] is resuming...\n", __func__, reinterpret_cast<uint64_t>(tid_1)); + return; +} + +static void * start_txn2(void * extra) { + struct start_txn_arg * args = (struct start_txn_arg *) extra; + DB_ENV * env = args -> env; + DB * db = args->db; + DB_TXN * parent = args->parent; + test_sync_sleep(&sync_s, 1); + printf("start %s [thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self())); + DB_TXN *txn; + int r = env->txn_begin(env, parent, &txn, DB_READ_COMMITTED); + assert(r == 0); + //do some random things... + DBT key, data; + dbt_init(&key, "hello", 6); + dbt_init(&data, "world", 6); + db->put(db, txn, &key, &data, 0); + db->get(db, txn, &key, &data, 0); + + r = txn->commit(txn, 0); + assert(r == 0); + printf("%s done[thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self())); + return extra; +} + +static void * start_txn1(void * extra) { + struct start_txn_arg * args = (struct start_txn_arg *) extra; + DB_ENV * env = args -> env; + DB * db = args->db; + printf("start %s: [thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self())); + DB_TXN *txn; + int r = env->txn_begin(env, NULL, &txn, DB_READ_COMMITTED); + assert(r == 0); + printf("%s: txn began by [thread %" PRIu64 "], will wait\n", __func__, reinterpret_cast<uint64_t>(pthread_self())); + test_sync_next_state(&sync_s); + test_sync_sleep(&sync_s,2); + printf("%s: [thread %" PRIu64 "] resumed\n", __func__, reinterpret_cast<uint64_t>(pthread_self())); + //do some random things... + DBT key, data; + dbt_init(&key, "hello", 6); + dbt_init(&data, "world", 6); + db->put(db, txn, &key, &data, 0); + db->get(db, txn, &key, &data, 0); + r = txn->commit(txn, 0); + assert(r == 0); + printf("%s: done[thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self())); + //test_sync_next_state(&sync_s); + return extra; +} + +int test_main (int UU(argc), char * const UU(argv[])) { + int r; + toku_os_recursive_delete(TOKU_TEST_FILENAME); + r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); + assert(r == 0); + + DB_ENV *env; + r = db_env_create(&env, 0); + assert(r == 0); + + r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); + assert(r == 0); + + DB *db = NULL; + r = db_create(&db, env, 0); + assert(r == 0); + + r = db->open(db, NULL, "testit", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); + assert(r == 0); + + DB_TXN * parent = NULL; + r = env->txn_begin(env, 0, &parent, DB_READ_COMMITTED); + assert(r == 0); + + ZERO_STRUCT(sync_s); + test_sync_init(&sync_s); + + pthread_t tid_1 = 0; + pthread_t tid_2 = 0; + pthread_t* callback_extra[2] = {&tid_1, &tid_2}; + toku_set_test_txn_sync_callback(test_callback, callback_extra); + + struct start_txn_arg args = {env, db, parent}; + + r = pthread_create(&tid_1, NULL, start_txn1, &args); + assert(r==0); + + r= pthread_create(&tid_2, NULL, start_txn2, &args); + assert(r==0); + + void * ret; + r = pthread_join(tid_1, &ret); + assert(r == 0); + r = pthread_join(tid_2, &ret); + assert(r == 0); + + r = parent->commit(parent, 0); + assert(r ==0); + + test_sync_destroy(&sync_s); + r = db->close(db, 0); + assert(r == 0); + + r = env->close(env, 0); + assert(r == 0); + + return 0; +} + diff --git a/storage/tokudb/PerconaFT/src/ydb.cc b/storage/tokudb/PerconaFT/src/ydb.cc index 88c6c86f214..55da418a0de 100644 --- a/storage/tokudb/PerconaFT/src/ydb.cc +++ b/storage/tokudb/PerconaFT/src/ydb.cc @@ -3148,6 +3148,10 @@ toku_test_get_latest_lsn(DB_ENV *env) { return rval.lsn; } +void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) { + set_test_txn_sync_callback(cb, extra); +} + int toku_test_get_checkpointing_user_data_status (void) { return toku_cachetable_get_checkpointing_user_data_status(); diff --git a/storage/tokudb/PerconaFT/src/ydb.h b/storage/tokudb/PerconaFT/src/ydb.h index 9d4e94c6f30..facbfdc9252 100644 --- a/storage/tokudb/PerconaFT/src/ydb.h +++ b/storage/tokudb/PerconaFT/src/ydb.h @@ -58,3 +58,6 @@ extern "C" uint64_t toku_test_get_latest_lsn(DB_ENV *env) __attribute__((__visib // test-only function extern "C" int toku_test_get_checkpointing_user_data_status(void) __attribute__((__visibility__("default"))); + +// test-only function +extern "C" void toku_set_test_txn_sync_callback(void (* ) (pthread_t, void *), void * extra) __attribute__((__visibility__("default"))); diff --git a/storage/tokudb/PerconaFT/src/ydb_db.cc b/storage/tokudb/PerconaFT/src/ydb_db.cc index 25b24467684..e5bd4e7d089 100644 --- a/storage/tokudb/PerconaFT/src/ydb_db.cc +++ b/storage/tokudb/PerconaFT/src/ydb_db.cc @@ -1015,6 +1015,25 @@ toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float return r; } + +static int +toku_db_recount_rows(DB* db, int (*progress_callback)(uint64_t count, + uint64_t deleted, + void* progress_extra), + void* progress_extra) { + + HANDLE_PANICKED_DB(db); + int r = 0; + r = + toku_ft_recount_rows( + db->i->ft_handle, + progress_callback, + progress_extra); + + return r; +} + + int toku_setup_db_internal (DB **dbp, DB_ENV *env, uint32_t flags, FT_HANDLE ft_handle, bool is_open) { if (flags || env == NULL) return EINVAL; @@ -1098,6 +1117,7 @@ toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) { USDB(dbt_pos_infty); USDB(dbt_neg_infty); USDB(get_fragmentation); + USDB(recount_rows); #undef USDB result->get_indexer = db_get_indexer; result->del = autotxn_db_del; |