summaryrefslogtreecommitdiff
path: root/storage/tokudb/PerconaFT
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT')
-rw-r--r--storage/tokudb/PerconaFT/CMakeLists.txt3
-rw-r--r--storage/tokudb/PerconaFT/CTestCustom.cmake.in (renamed from storage/tokudb/PerconaFT/CTestCustom.cmake)0
-rw-r--r--storage/tokudb/PerconaFT/README.md8
-rw-r--r--storage/tokudb/PerconaFT/buildheader/make_tdb.cc5
-rw-r--r--storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake3
-rw-r--r--storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake34
-rw-r--r--storage/tokudb/PerconaFT/ft/CMakeLists.txt1
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-flusher.cc6
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-internal.h5
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-ops.cc11
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-ops.h9
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-recount-rows.cc115
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-status.cc36
-rw-r--r--storage/tokudb/PerconaFT/ft/ft-test-helpers.cc29
-rw-r--r--storage/tokudb/PerconaFT/ft/ft.cc44
-rw-r--r--storage/tokudb/PerconaFT/ft/ft.h6
-rw-r--r--storage/tokudb/PerconaFT/ft/leafentry.h88
-rw-r--r--storage/tokudb/PerconaFT/ft/loader/loader.cc121
-rw-r--r--storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc5
-rw-r--r--storage/tokudb/PerconaFT/ft/logger/logger.h1
-rw-r--r--storage/tokudb/PerconaFT/ft/logger/recover.h2
-rw-r--r--storage/tokudb/PerconaFT/ft/node.cc475
-rw-r--r--storage/tokudb/PerconaFT/ft/node.h67
-rw-r--r--storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc13
-rw-r--r--storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h1
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt2
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/make-tree.cc13
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/msnfilter.cc70
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc94
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc2
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24bin0 -> 131 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24bin0 -> 94 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25bin0 -> 131 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25bin0 -> 94 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26bin0 -> 131 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26bin0 -> 94 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27bin0 -> 131 bytes
-rwxr-xr-xstorage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27bin0 -> 94 bytes
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28bin0 -> 131 bytes
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28bin0 -> 94 bytes
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29bin0 -> 131 bytes
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29bin0 -> 94 bytes
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc13
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc15
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc15
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc15
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc15
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc15
-rw-r--r--storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc15
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc1
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn.cc19
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_manager.cc21
-rw-r--r--storage/tokudb/PerconaFT/ft/txn/txn_manager.h9
-rw-r--r--storage/tokudb/PerconaFT/ft/ule.cc1169
-rw-r--r--storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt10
-rw-r--r--storage/tokudb/PerconaFT/portability/toku_pthread.h33
-rw-r--r--storage/tokudb/PerconaFT/portability/toku_time.h10
-rw-r--r--storage/tokudb/PerconaFT/scripts/run.stress-tests.py26
-rw-r--r--storage/tokudb/PerconaFT/src/export.map1
-rw-r--r--storage/tokudb/PerconaFT/src/indexer-undo-do.cc2
-rw-r--r--storage/tokudb/PerconaFT/src/tests/CMakeLists.txt2
-rw-r--r--storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc161
-rw-r--r--storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc4
-rw-r--r--storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc523
-rw-r--r--storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc217
-rw-r--r--storage/tokudb/PerconaFT/src/ydb.cc4
-rw-r--r--storage/tokudb/PerconaFT/src/ydb.h3
-rw-r--r--storage/tokudb/PerconaFT/src/ydb_db.cc20
68 files changed, 2764 insertions, 843 deletions
diff --git a/storage/tokudb/PerconaFT/CMakeLists.txt b/storage/tokudb/PerconaFT/CMakeLists.txt
index 843b4c9d0e8..0e283c13c61 100644
--- a/storage/tokudb/PerconaFT/CMakeLists.txt
+++ b/storage/tokudb/PerconaFT/CMakeLists.txt
@@ -1,3 +1,6 @@
+if (CMAKE_PROJECT_NAME STREQUAL TokuDB)
+ cmake_minimum_required(VERSION 2.8.8 FATAL_ERROR)
+endif()
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules")
project(TokuDB)
diff --git a/storage/tokudb/PerconaFT/CTestCustom.cmake b/storage/tokudb/PerconaFT/CTestCustom.cmake.in
index 54170b2b903..54170b2b903 100644
--- a/storage/tokudb/PerconaFT/CTestCustom.cmake
+++ b/storage/tokudb/PerconaFT/CTestCustom.cmake.in
diff --git a/storage/tokudb/PerconaFT/README.md b/storage/tokudb/PerconaFT/README.md
index 7e30a558bc7..d53caf00190 100644
--- a/storage/tokudb/PerconaFT/README.md
+++ b/storage/tokudb/PerconaFT/README.md
@@ -113,11 +113,11 @@ All source code and test contributions must be provided under a [BSD 2-Clause][b
License
-------
-PerconaFT is available under the GPL version 2, and AGPL version 3, with slight modifications.
+PerconaFT is available under the GPL version 2, and AGPL version 3.
See [COPYING.AGPLv3][agpllicense],
[COPYING.GPLv2][gpllicense], and
[PATENTS][patents].
-[agpllicense]: http://github.com/Perona/PerconaFT/blob/master/COPYING.AGPLv3
-[gpllicense]: http://github.com/Perona/PerconaFT/blob/master/COPYING.GPLv2
-[patents]: http://github.com/Perona/PerconaFT/blob/master/PATENTS
+[agpllicense]: http://github.com/Percona/PerconaFT/blob/master/COPYING.AGPLv3
+[gpllicense]: http://github.com/Percona/PerconaFT/blob/master/COPYING.GPLv2
+[patents]: http://github.com/Percona/PerconaFT/blob/master/PATENTS
diff --git a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc
index 958f00a8706..5c29209e19d 100644
--- a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc
+++ b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc
@@ -510,8 +510,9 @@ static void print_db_struct (void) {
"int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, uint32_t flags)",
"int (*get_fractal_tree_info64)(DB*,uint64_t*,uint64_t*,uint64_t*,uint64_t*)",
"int (*iterate_fractal_tree_block_map)(DB*,int(*)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*),void*)",
- "const char *(*get_dname)(DB *db)",
- "int (*get_last_key)(DB *db, YDB_CALLBACK_FUNCTION func, void* extra)",
+ "const char *(*get_dname)(DB *db)",
+ "int (*get_last_key)(DB *db, YDB_CALLBACK_FUNCTION func, void* extra)",
+ "int (*recount_rows)(DB* db, int (*progress_callback)(uint64_t count, uint64_t deleted, void* progress_extra), void* progress_extra)",
NULL};
sort_and_dump_fields("db", true, extra);
}
diff --git a/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake b/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake
index 15066906831..e1da095fc00 100644
--- a/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake
+++ b/storage/tokudb/PerconaFT/cmake_modules/TokuMergeLibs.cmake
@@ -48,7 +48,8 @@ MACRO(TOKU_MERGE_STATIC_LIBS TARGET OUTPUT_NAME LIBS_TO_MERGE)
ENDIF()
ENDFOREACH()
IF(OSLIBS)
- #LIST(REMOVE_DUPLICATES OSLIBS)
+ # REMOVE_DUPLICATES destroys the order of the libs so disabled
+ # LIST(REMOVE_DUPLICATES OSLIBS)
TARGET_LINK_LIBRARIES(${TARGET} LINK_PUBLIC ${OSLIBS})
ENDIF()
diff --git a/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake b/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake
index 40ccbcc0aed..77f6d8f67b7 100644
--- a/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake
+++ b/storage/tokudb/PerconaFT/cmake_modules/TokuSetupCompiler.cmake
@@ -24,12 +24,12 @@ endif ()
## add TOKU_PTHREAD_DEBUG for debug builds
if (CMAKE_VERSION VERSION_LESS 3.0)
- set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG TOKU_PTHREAD_DEBUG=1)
- set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DRD TOKU_PTHREAD_DEBUG=1)
+ set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DEBUG TOKU_PTHREAD_DEBUG=1 TOKU_DEBUG_TXN_SYNC=1)
+ set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DRD TOKU_PTHREAD_DEBUG=1 TOKU_DEBUG_TXN_SYNC=1)
set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS_DRD _FORTIFY_SOURCE=2)
else ()
set_property(DIRECTORY APPEND PROPERTY COMPILE_DEFINITIONS
- $<$<OR:$<CONFIG:DEBUG>,$<CONFIG:DRD>>:TOKU_PTHREAD_DEBUG=1>
+ $<$<OR:$<CONFIG:DEBUG>,$<CONFIG:DRD>>:TOKU_PTHREAD_DEBUG=1 TOKU_DEBUG_TXN_SYNC=1>
$<$<CONFIG:DRD>:_FORTIFY_SOURCE=2>
)
endif ()
@@ -65,8 +65,10 @@ set_cflags_if_supported(
-Wno-error=missing-format-attribute
-Wno-error=address-of-array-temporary
-Wno-error=tautological-constant-out-of-range-compare
+ -Wno-error=maybe-uninitialized
-Wno-ignored-attributes
-Wno-error=extern-c-compat
+ -Wno-pointer-bool-conversion
-fno-rtti
-fno-exceptions
)
@@ -119,13 +121,18 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL Clang)
set(CMAKE_C_FLAGS_RELEASE "-g -O3 ${CMAKE_C_FLAGS_RELEASE} -UNDEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 ${CMAKE_CXX_FLAGS_RELEASE} -UNDEBUG")
else ()
+ if (APPLE)
+ set(FLTO_OPTS "-fwhole-program")
+ else ()
+ set(FLTO_OPTS "-fuse-linker-plugin")
+ endif()
# we overwrite this because the default passes -DNDEBUG and we don't want that
- set(CMAKE_C_FLAGS_RELWITHDEBINFO "-flto -fuse-linker-plugin ${CMAKE_C_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG")
- set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-flto -fuse-linker-plugin ${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG")
- set(CMAKE_C_FLAGS_RELEASE "-g -O3 -flto -fuse-linker-plugin ${CMAKE_C_FLAGS_RELEASE} -UNDEBUG")
- set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 -flto -fuse-linker-plugin ${CMAKE_CXX_FLAGS_RELEASE} -UNDEBUG")
- set(CMAKE_EXE_LINKER_FLAGS "-g -fuse-linker-plugin ${CMAKE_EXE_LINKER_FLAGS}")
- set(CMAKE_SHARED_LINKER_FLAGS "-g -fuse-linker-plugin ${CMAKE_SHARED_LINKER_FLAGS}")
+ set(CMAKE_C_FLAGS_RELWITHDEBINFO "-flto ${FLTO_OPTS} ${CMAKE_C_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG")
+ set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-flto ${FLTO_OPTS} ${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -g -O3 -UNDEBUG")
+ set(CMAKE_C_FLAGS_RELEASE "-g -O3 -flto ${FLTO_OPTS} ${CMAKE_C_FLAGS_RELEASE} -UNDEBUG")
+ set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 -flto ${FLTO_OPTS} ${CMAKE_CXX_FLAGS_RELEASE} -UNDEBUG")
+ set(CMAKE_EXE_LINKER_FLAGS "-g ${FLTO_OPTS} ${CMAKE_EXE_LINKER_FLAGS}")
+ set(CMAKE_SHARED_LINKER_FLAGS "-g ${FLTO_OPTS} ${CMAKE_SHARED_LINKER_FLAGS}")
endif ()
## set warnings
@@ -159,15 +166,6 @@ endif ()
set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
set(CMAKE_CXX_FLAGS "-Wall -Werror ${CMAKE_CXX_FLAGS}")
-## need to set -stdlib=libc++ to get real c++11 support on darwin
-if (APPLE)
- if (CMAKE_GENERATOR STREQUAL Xcode)
- set(CMAKE_XCODE_ATTRIBUTE_CLANG_CXX_LIBRARY "libc++")
- else ()
- add_definitions(-stdlib=libc++)
- endif ()
-endif ()
-
# pick language dialect
set(CMAKE_C_FLAGS "-std=c99 ${CMAKE_C_FLAGS}")
check_cxx_compiler_flag(-std=c++11 HAVE_STDCXX11)
diff --git a/storage/tokudb/PerconaFT/ft/CMakeLists.txt b/storage/tokudb/PerconaFT/ft/CMakeLists.txt
index 744b9c9a9e1..11091073ac2 100644
--- a/storage/tokudb/PerconaFT/ft/CMakeLists.txt
+++ b/storage/tokudb/PerconaFT/ft/CMakeLists.txt
@@ -36,6 +36,7 @@ set(FT_SOURCES
ft-flusher
ft-hot-flusher
ft-ops
+ ft-recount-rows
ft-status
ft-test-helpers
ft-verify
diff --git a/storage/tokudb/PerconaFT/ft/ft-flusher.cc b/storage/tokudb/PerconaFT/ft/ft-flusher.cc
index 530947fe868..fb456ea6a18 100644
--- a/storage/tokudb/PerconaFT/ft/ft-flusher.cc
+++ b/storage/tokudb/PerconaFT/ft/ft-flusher.cc
@@ -1572,6 +1572,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
txn_gc_info *gc_info;
STAT64INFO_S stats_delta;
+ int64_t logical_rows_delta = 0;
size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
@@ -1599,8 +1600,8 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
is_fresh,
gc_info,
flow_deltas,
- &stats_delta
- );
+ &stats_delta,
+ &logical_rows_delta);
remaining_memsize -= memsize_in_buffer;
return 0;
}
@@ -1613,6 +1614,7 @@ void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID p
if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
}
+ toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
if (do_garbage_collection) {
size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
// may be misleading if there's a broadcast message in there
diff --git a/storage/tokudb/PerconaFT/ft/ft-internal.h b/storage/tokudb/PerconaFT/ft/ft-internal.h
index 6bf7029245b..eec591d1744 100644
--- a/storage/tokudb/PerconaFT/ft/ft-internal.h
+++ b/storage/tokudb/PerconaFT/ft/ft-internal.h
@@ -143,6 +143,10 @@ struct ft_header {
MSN msn_at_start_of_last_completed_optimize;
STAT64INFO_S on_disk_stats;
+
+ // This represents the balance of inserts - deletes and should be
+ // closer to a logical representation of the number of records in an index
+ uint64_t on_disk_logical_rows;
};
typedef struct ft_header *FT_HEADER;
@@ -176,6 +180,7 @@ struct ft {
// protected by atomic builtins
STAT64INFO_S in_memory_stats;
+ uint64_t in_memory_logical_rows;
// transient, not serialized to disk. updated when we do write to
// disk. tells us whether we can do partial eviction (we can't if
diff --git a/storage/tokudb/PerconaFT/ft/ft-ops.cc b/storage/tokudb/PerconaFT/ft/ft-ops.cc
index f5da82ee000..8f61bc67339 100644
--- a/storage/tokudb/PerconaFT/ft/ft-ops.cc
+++ b/storage/tokudb/PerconaFT/ft/ft-ops.cc
@@ -1371,7 +1371,8 @@ static void inject_message_in_locked_node(
ft_msg msg_with_msn(msg.kdbt(), msg.vdbt(), msg.type(), msg_msn, msg.xids());
paranoid_invariant(msg_with_msn.msn().msn > node->max_msn_applied_to_node_on_disk.msn);
- STAT64INFO_S stats_delta = {0,0};
+ STAT64INFO_S stats_delta = { 0,0 };
+ int64_t logical_rows_delta = 0;
toku_ftnode_put_msg(
ft->cmp,
ft->update_fun,
@@ -1381,11 +1382,12 @@ static void inject_message_in_locked_node(
true,
gc_info,
flow_deltas,
- &stats_delta
- );
+ &stats_delta,
+ &logical_rows_delta);
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
}
+ toku_ft_adjust_logical_row_count(ft, logical_rows_delta);
//
// assumption is that toku_ftnode_put_msg will
// mark the node as dirty.
@@ -2169,6 +2171,7 @@ int toku_ft_insert_unique(FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool
if (r == 0) {
ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, FT_INSERT);
+ toku_ft_adjust_logical_row_count(ft_h->ft, 1);
}
return r;
}
@@ -2344,6 +2347,7 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool
if (r != 0) {
toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info);
}
+ toku_ft_adjust_logical_row_count(ft_h->ft, 1);
}
}
@@ -2513,6 +2517,7 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali
oldest_referenced_xid_estimate,
txn != nullptr ? !txn->for_recovery : false);
toku_ft_send_delete(ft_h, key, message_xids, &gc_info);
+ toku_ft_adjust_logical_row_count(ft_h->ft, -1);
}
}
diff --git a/storage/tokudb/PerconaFT/ft/ft-ops.h b/storage/tokudb/PerconaFT/ft/ft-ops.h
index 7d0b165b70c..313a74628ea 100644
--- a/storage/tokudb/PerconaFT/ft/ft-ops.h
+++ b/storage/tokudb/PerconaFT/ft/ft-ops.h
@@ -207,6 +207,15 @@ extern int toku_ft_debug_mode;
int toku_verify_ft (FT_HANDLE ft_h) __attribute__ ((warn_unused_result));
int toku_verify_ft_with_progress (FT_HANDLE ft_h, int (*progress_callback)(void *extra, float progress), void *extra, int verbose, int keep_going) __attribute__ ((warn_unused_result));
+int toku_ft_recount_rows(
+ FT_HANDLE ft,
+ int (*progress_callback)(
+ uint64_t count,
+ uint64_t deleted,
+ void* progress_extra),
+ void* progress_extra);
+
+
DICTIONARY_ID toku_ft_get_dictionary_id(FT_HANDLE);
enum ft_flags {
diff --git a/storage/tokudb/PerconaFT/ft/ft-recount-rows.cc b/storage/tokudb/PerconaFT/ft/ft-recount-rows.cc
new file mode 100644
index 00000000000..adac96f4882
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/ft-recount-rows.cc
@@ -0,0 +1,115 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "ft/serialize/block_table.h"
+#include "ft/ft.h"
+#include "ft/ft-internal.h"
+#include "ft/cursor.h"
+
+struct recount_rows_extra_t {
+ int (*_progress_callback)(
+ uint64_t count,
+ uint64_t deleted,
+ void* progress_extra);
+ void* _progress_extra;
+ uint64_t _keys;
+ bool _cancelled;
+};
+
+static int recount_rows_found(
+ uint32_t UU(keylen),
+ const void* key,
+ uint32_t UU(vallen),
+ const void* UU(val),
+ void* extra,
+ bool UU(lock_only)) {
+
+ recount_rows_extra_t* rre = (recount_rows_extra_t*)extra;
+
+ if (FT_LIKELY(key != nullptr)) {
+ rre->_keys++;
+ }
+ return rre->_cancelled
+ = rre->_progress_callback(rre->_keys, 0, rre->_progress_extra);
+}
+static bool recount_rows_interrupt(void* extra, uint64_t deleted_rows) {
+ recount_rows_extra_t* rre = (recount_rows_extra_t*)extra;
+
+ return rre->_cancelled =
+ rre->_progress_callback(rre->_keys, deleted_rows, rre->_progress_extra);
+}
+int toku_ft_recount_rows(
+ FT_HANDLE ft,
+ int (*progress_callback)(
+ uint64_t count,
+ uint64_t deleted,
+ void* progress_extra),
+ void* progress_extra) {
+
+ int ret = 0;
+ recount_rows_extra_t rre = {
+ progress_callback,
+ progress_extra,
+ 0,
+ false
+ };
+
+ ft_cursor c;
+ ret = toku_ft_cursor_create(ft, &c, nullptr, C_READ_ANY, false, false);
+ if (ret) return ret;
+
+ toku_ft_cursor_set_check_interrupt_cb(
+ &c,
+ recount_rows_interrupt,
+ &rre);
+
+ ret = toku_ft_cursor_first(&c, recount_rows_found, &rre);
+ while (FT_LIKELY(ret == 0)) {
+ ret = toku_ft_cursor_next(&c, recount_rows_found, &rre);
+ }
+
+ toku_ft_cursor_destroy(&c);
+
+ if (rre._cancelled == false) {
+ // update ft count
+ toku_unsafe_set(&ft->ft->in_memory_logical_rows, rre._keys);
+ ret = 0;
+ }
+
+ return ret;
+}
diff --git a/storage/tokudb/PerconaFT/ft/ft-status.cc b/storage/tokudb/PerconaFT/ft/ft-status.cc
index 982df1822c4..19a378c22bf 100644
--- a/storage/tokudb/PerconaFT/ft/ft-status.cc
+++ b/storage/tokudb/PerconaFT/ft/ft-status.cc
@@ -128,24 +128,24 @@ void CACHETABLE_STATUS_S::init() {
CT_STATUS_INIT(CT_LONG_WAIT_PRESSURE_COUNT, CACHETABLE_LONG_WAIT_PRESSURE_COUNT, UINT64, "number of long waits on cache pressure");
CT_STATUS_INIT(CT_LONG_WAIT_PRESSURE_TIME, CACHETABLE_LONG_WAIT_PRESSURE_TIME, UINT64, "long time waiting on cache pressure");
- CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS, CACHETABLE_POOL_CLIENT_NUM_THREADS, UINT64, "number of threads in pool");
- CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CLIENT_NUM_THREADS_ACTIVE, UINT64, "number of currently active threads in pool");
- CT_STATUS_INIT(CT_POOL_CLIENT_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_QUEUE_SIZE, UINT64, "number of currently queued work items");
- CT_STATUS_INIT(CT_POOL_CLIENT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_MAX_QUEUE_SIZE, UINT64, "largest number of queued work items");
- CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, UINT64, "total number of work items processed");
- CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CLIENT_TOTAL_EXECUTION_TIME, UINT64, "total execution time of processing work items");
- CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS, CACHETABLE_POOL_CACHETABLE_NUM_THREADS, UINT64, "number of threads in pool");
- CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CACHETABLE_NUM_THREADS_ACTIVE, UINT64, "number of currently active threads in pool");
- CT_STATUS_INIT(CT_POOL_CACHETABLE_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_QUEUE_SIZE, UINT64, "number of currently queued work items");
- CT_STATUS_INIT(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_MAX_QUEUE_SIZE, UINT64, "largest number of queued work items");
- CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, UINT64, "total number of work items processed");
- CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, UINT64, "total execution time of processing work items");
- CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS, UINT64, "number of threads in pool");
- CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, UINT64, "number of currently active threads in pool");
- CT_STATUS_INIT(CT_POOL_CHECKPOINT_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_QUEUE_SIZE, UINT64, "number of currently queued work items");
- CT_STATUS_INIT(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_MAX_QUEUE_SIZE, UINT64, "largest number of queued work items");
- CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, UINT64, "total number of work items processed");
- CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, UINT64, "total execution time of processing work items");
+ CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS, CACHETABLE_POOL_CLIENT_NUM_THREADS, UINT64, "client pool: number of threads in pool");
+ CT_STATUS_INIT(CT_POOL_CLIENT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CLIENT_NUM_THREADS_ACTIVE, UINT64, "client pool: number of currently active threads in pool");
+ CT_STATUS_INIT(CT_POOL_CLIENT_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_QUEUE_SIZE, UINT64, "client pool: number of currently queued work items");
+ CT_STATUS_INIT(CT_POOL_CLIENT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CLIENT_MAX_QUEUE_SIZE, UINT64, "client pool: largest number of queued work items");
+ CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CLIENT_TOTAL_ITEMS_PROCESSED, UINT64, "client pool: total number of work items processed");
+ CT_STATUS_INIT(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CLIENT_TOTAL_EXECUTION_TIME, UINT64, "client pool: total execution time of processing work items");
+ CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS, CACHETABLE_POOL_CACHETABLE_NUM_THREADS, UINT64, "cachetable pool: number of threads in pool");
+ CT_STATUS_INIT(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CACHETABLE_NUM_THREADS_ACTIVE, UINT64, "cachetable pool: number of currently active threads in pool");
+ CT_STATUS_INIT(CT_POOL_CACHETABLE_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_QUEUE_SIZE, UINT64, "cachetable pool: number of currently queued work items");
+ CT_STATUS_INIT(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE, CACHETABLE_POOL_CACHETABLE_MAX_QUEUE_SIZE, UINT64, "cachetable pool: largest number of queued work items");
+ CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED, UINT64, "cachetable pool: total number of work items processed");
+ CT_STATUS_INIT(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CACHETABLE_TOTAL_EXECUTION_TIME, UINT64, "cachetable pool: total execution time of processing work items");
+ CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS, UINT64, "checkpoint pool: number of threads in pool");
+ CT_STATUS_INIT(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, CACHETABLE_POOL_CHECKPOINT_NUM_THREADS_ACTIVE, UINT64, "checkpoint pool: number of currently active threads in pool");
+ CT_STATUS_INIT(CT_POOL_CHECKPOINT_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_QUEUE_SIZE, UINT64, "checkpoint pool: number of currently queued work items");
+ CT_STATUS_INIT(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE, CACHETABLE_POOL_CHECKPOINT_MAX_QUEUE_SIZE, UINT64, "checkpoint pool: largest number of queued work items");
+ CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, CACHETABLE_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED, UINT64, "checkpoint pool: total number of work items processed");
+ CT_STATUS_INIT(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, CACHETABLE_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME, UINT64, "checkpoint pool: total execution time of processing work items");
m_initialized = true;
#undef CT_STATUS_INIT
diff --git a/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc b/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc
index 7ca36c23780..6fcdbbdc9e3 100644
--- a/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc
+++ b/storage/tokudb/PerconaFT/ft/ft-test-helpers.cc
@@ -172,21 +172,26 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE ft_handle, BLOCKNUM blocknum, const
assert(node->height==0);
DBT kdbt, vdbt;
- ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen),
- FT_INSERT, next_dummymsn(), toku_xids_get_root_xids());
+ ft_msg msg(
+ toku_fill_dbt(&kdbt, key, keylen),
+ toku_fill_dbt(&vdbt, val, vallen),
+ FT_INSERT,
+ next_dummymsn(),
+ toku_xids_get_root_xids());
static size_t zero_flow_deltas[] = { 0, 0 };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
- toku_ftnode_put_msg(ft_handle->ft->cmp,
- ft_handle->ft->update_fun,
- node,
- -1,
- msg,
- true,
- &gc_info,
- zero_flow_deltas,
- NULL
- );
+ toku_ftnode_put_msg(
+ ft_handle->ft->cmp,
+ ft_handle->ft->update_fun,
+ node,
+ -1,
+ msg,
+ true,
+ &gc_info,
+ zero_flow_deltas,
+ NULL,
+ NULL);
toku_verify_or_set_counts(node);
diff --git a/storage/tokudb/PerconaFT/ft/ft.cc b/storage/tokudb/PerconaFT/ft/ft.cc
index 2a0fb6f6800..93d21233bf7 100644
--- a/storage/tokudb/PerconaFT/ft/ft.cc
+++ b/storage/tokudb/PerconaFT/ft/ft.cc
@@ -198,6 +198,8 @@ static void ft_checkpoint (CACHEFILE cf, int fd, void *header_v) {
ch->time_of_last_modification = now;
ch->checkpoint_count++;
ft_hack_highest_unused_msn_for_upgrade_for_checkpoint(ft);
+ ch->on_disk_logical_rows =
+ ft->h->on_disk_logical_rows = ft->in_memory_logical_rows;
// write translation and header to disk (or at least to OS internal buffer)
toku_serialize_ft_to(fd, ch, &ft->blocktable, ft->cf);
@@ -383,7 +385,8 @@ ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that
.count_of_optimize_in_progress = 0,
.count_of_optimize_in_progress_read_from_disk = 0,
.msn_at_start_of_last_completed_optimize = ZERO_MSN,
- .on_disk_stats = ZEROSTATS
+ .on_disk_stats = ZEROSTATS,
+ .on_disk_logical_rows = 0
};
return (FT_HEADER) toku_xmemdup(&h, sizeof h);
}
@@ -802,7 +805,14 @@ toku_ft_stat64 (FT ft, struct ftstat64_s *s) {
s->fsize = toku_cachefile_size(ft->cf);
// just use the in memory stats from the header
// prevent appearance of negative numbers for numrows, numbytes
- int64_t n = ft->in_memory_stats.numrows;
+ // if the logical count was never properly re-counted on an upgrade,
+ // return the existing physical count instead.
+ int64_t n;
+ if (ft->in_memory_logical_rows == (uint64_t)-1) {
+ n = ft->in_memory_stats.numrows;
+ } else {
+ n = ft->in_memory_logical_rows;
+ }
if (n < 0) {
n = 0;
}
@@ -871,20 +881,38 @@ DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
return &ft_handle->ft->cmp_descriptor;
}
-void
-toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
+void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
(void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows);
(void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
}
-void
-toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
+void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
(void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
(void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
}
-void
-toku_ft_remove_reference(FT ft, bool oplsn_valid, LSN oplsn, remove_ft_ref_callback remove_ref, void *extra) {
+void toku_ft_adjust_logical_row_count(FT ft, int64_t delta) {
+ // In order to make sure that the correct count is returned from
+ // toku_ft_stat64, the ft->(in_memory|on_disk)_logical_rows _MUST_NOT_ be
+ // modified from anywhere else from here with the exceptions of
+ // serializing in a header, initializing a new header and analyzing
+ // an index for a logical_row count.
+ // The gist is that on an index upgrade, all logical_rows values
+ // in the ft header are set to -1 until an analyze can reset it to an
+ // accurate value. Until then, the physical count from in_memory_stats
+ // must be returned in toku_ft_stat64.
+ if (delta != 0 && ft->in_memory_logical_rows != (uint64_t)-1) {
+ toku_sync_fetch_and_add(&(ft->in_memory_logical_rows), delta);
+ }
+}
+
+void toku_ft_remove_reference(
+ FT ft,
+ bool oplsn_valid,
+ LSN oplsn,
+ remove_ft_ref_callback remove_ref,
+ void *extra) {
+
toku_ft_grab_reflock(ft);
if (toku_ft_has_one_reference_unlocked(ft)) {
toku_ft_release_reflock(ft);
diff --git a/storage/tokudb/PerconaFT/ft/ft.h b/storage/tokudb/PerconaFT/ft/ft.h
index cc64bdfc6d3..d600e093bdc 100644
--- a/storage/tokudb/PerconaFT/ft/ft.h
+++ b/storage/tokudb/PerconaFT/ft/ft.h
@@ -127,13 +127,17 @@ DESCRIPTOR toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle);
typedef struct {
// delta versions in basements could be negative
+ // These represent the physical leaf entries and do not account
+ // for pending deletes or other in-flight messages that have not been
+ // applied to a leaf entry.
int64_t numrows;
int64_t numbytes;
} STAT64INFO_S, *STAT64INFO;
-static const STAT64INFO_S ZEROSTATS = { .numrows = 0, .numbytes = 0};
+static const STAT64INFO_S ZEROSTATS = { .numrows = 0, .numbytes = 0 };
void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta);
void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta);
+void toku_ft_adjust_logical_row_count(FT ft, int64_t delta);
typedef void (*remove_ft_ref_callback)(FT ft, void *extra);
void toku_ft_remove_reference(FT ft,
diff --git a/storage/tokudb/PerconaFT/ft/leafentry.h b/storage/tokudb/PerconaFT/ft/leafentry.h
index 9cb81ef7cd6..7274a1480e2 100644
--- a/storage/tokudb/PerconaFT/ft/leafentry.h
+++ b/storage/tokudb/PerconaFT/ft/leafentry.h
@@ -180,43 +180,57 @@ uint64_t le_outermost_uncommitted_xid (LEAFENTRY le);
// r|r!=0&&r!=TOKUDB_ACCEPT: Quit early, return r, because something unexpected went wrong (error case)
typedef int(*LE_ITERATE_CALLBACK)(TXNID id, TOKUTXN context, bool is_provisional);
-int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context);
-
-void le_extract_val(LEAFENTRY le,
- // should we return the entire leafentry as the val?
- bool is_leaf_mode, enum cursor_read_type read_type,
- TOKUTXN ttxn, uint32_t *vallen, void **val);
-
-size_t
-leafentry_disksize_13(LEAFENTRY_13 le);
-
-int
-toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data.
- void** keyp,
- uint32_t* keylen,
- size_t *new_leafentry_memorysize,
- LEAFENTRY *new_leafentry_p);
+int le_iterate_val(
+ LEAFENTRY le,
+ LE_ITERATE_CALLBACK f,
+ void** valpp,
+ uint32_t* vallenp,
+ TOKUTXN context);
+
+void le_extract_val(
+ LEAFENTRY le,
+ // should we return the entire leafentry as the val?
+ bool is_leaf_mode,
+ enum cursor_read_type read_type,
+ TOKUTXN ttxn,
+ uint32_t* vallen,
+ void** val);
+
+size_t leafentry_disksize_13(LEAFENTRY_13 le);
+
+int toku_le_upgrade_13_14(
+ // NULL if there was no stored data.
+ LEAFENTRY_13 old_leafentry,
+ void** keyp,
+ uint32_t* keylen,
+ size_t* new_leafentry_memorysize,
+ LEAFENTRY *new_leafentry_p);
class bn_data;
-void
-toku_le_apply_msg(const ft_msg &msg,
- LEAFENTRY old_leafentry, // NULL if there was no stored data.
- bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
- uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
- uint32_t old_keylen,
- txn_gc_info *gc_info,
- LEAFENTRY *new_leafentry_p,
- int64_t * numbytes_delta_p);
-
-bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info);
-
-void
-toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
- bn_data* data_buffer,
- uint32_t idx,
- void* keyp,
- uint32_t keylen,
- txn_gc_info *gc_info,
- LEAFENTRY *new_leaf_entry,
- int64_t * numbytes_delta_p);
+int64_t toku_le_apply_msg(
+ const ft_msg &msg,
+ // NULL if there was no stored data.
+ LEAFENTRY old_leafentry,
+ // bn_data storing leafentry, if NULL, means there is no bn_data
+ bn_data* data_buffer,
+ // index in data_buffer where leafentry is stored (and should be replaced
+ uint32_t idx,
+ uint32_t old_keylen,
+ txn_gc_info* gc_info,
+ LEAFENTRY *new_leafentry_p,
+ int64_t* numbytes_delta_p);
+
+bool toku_le_worth_running_garbage_collection(
+ LEAFENTRY le,
+ txn_gc_info* gc_info);
+
+void toku_le_garbage_collect(
+ LEAFENTRY old_leaf_entry,
+ bn_data* data_buffer,
+ uint32_t idx,
+ void* keyp,
+ uint32_t keylen,
+ txn_gc_info* gc_info,
+ LEAFENTRY* new_leaf_entry,
+ int64_t* numbytes_delta_p);
diff --git a/storage/tokudb/PerconaFT/ft/loader/loader.cc b/storage/tokudb/PerconaFT/ft/loader/loader.cc
index 5ff0d69af46..20f9363da1e 100644
--- a/storage/tokudb/PerconaFT/ft/loader/loader.cc
+++ b/storage/tokudb/PerconaFT/ft/loader/loader.cc
@@ -2312,11 +2312,42 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
return lbuf;
}
-static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, FTLOADER bl, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method);
-static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method);
-static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size, STAT64INFO stats_to_update);
-static int write_translation_table (struct dbout *out, long long *off_of_translation_p);
-static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk);
+static void finish_leafnode(
+ struct dbout* out,
+ struct leaf_buf* lbuf,
+ int progress_allocation,
+ FTLOADER bl,
+ uint32_t target_basementnodesize,
+ enum toku_compression_method target_compression_method);
+
+static int write_nonleaves(
+ FTLOADER bl,
+ FIDX pivots_fidx,
+ struct dbout* out,
+ struct subtrees_info* sts,
+ const DESCRIPTOR descriptor,
+ uint32_t target_nodesize,
+ uint32_t target_basementnodesize,
+ enum toku_compression_method target_compression_method);
+
+static void add_pair_to_leafnode(
+ struct leaf_buf* lbuf,
+ unsigned char* key,
+ int keylen,
+ unsigned char* val,
+ int vallen,
+ int this_leafentry_size,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta);
+
+static int write_translation_table(
+ struct dbout* out,
+ long long* off_of_translation_p);
+
+static int write_header(
+ struct dbout* out,
+ long long translation_location_on_disk,
+ long long translation_size_on_disk);
static void drain_writer_q(QUEUE q) {
void *item;
@@ -2448,6 +2479,12 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
DBT maxkey = make_dbt(0, 0); // keep track of the max key of the current node
STAT64INFO_S deltas = ZEROSTATS;
+ // This is just a placeholder and not used in the loader, the real/accurate
+ // stats will come out of 'deltas' because this loader is not pushing
+ // messages down into the top of a fractal tree where the logical row count
+ // is done, it is directly creating leaf entries so it must also take on
+ // performing the logical row counting on its own
+ int64_t logical_rows_delta = 0;
while (result == 0) {
void *item;
{
@@ -2506,7 +2543,15 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
}
- add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size, this_leafentry_size, &deltas);
+ add_pair_to_leafnode(
+ lbuf,
+ (unsigned char*)key.data,
+ key.size,
+ (unsigned char*)val.data,
+ val.size,
+ this_leafentry_size,
+ &deltas,
+ &logical_rows_delta);
n_rows_remaining--;
update_maxkey(&maxkey, &key); // set the new maxkey to the current key
@@ -2526,6 +2571,13 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
toku_ft_update_stats(&ft.in_memory_stats, deltas);
}
+ // As noted above, the loader directly creates a tree structure without
+ // going through the higher level ft API and tus bypasses the logical row
+ // counting performed at that level. So, we must manually update the logical
+ // row count with the info we have from the physical delta that comes out of
+ // add_pair_to_leafnode.
+ toku_ft_adjust_logical_row_count(&ft, deltas.numrows);
+
cleanup_maxkey(&maxkey);
if (lbuf) {
@@ -2878,7 +2930,16 @@ int toku_ft_loader_get_error(FTLOADER bl, int *error) {
return 0;
}
-static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size, STAT64INFO stats_to_update) {
+static void add_pair_to_leafnode(
+ struct leaf_buf* lbuf,
+ unsigned char* key,
+ int keylen,
+ unsigned char* val,
+ int vallen,
+ int this_leafentry_size,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta) {
+
lbuf->nkeys++;
lbuf->ndata++;
lbuf->dsize += keylen + vallen;
@@ -2890,11 +2951,25 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
FTNODE leafnode = lbuf->node;
uint32_t idx = BLB_DATA(leafnode, 0)->num_klpairs();
DBT kdbt, vdbt;
- ft_msg msg(toku_fill_dbt(&kdbt, key, keylen), toku_fill_dbt(&vdbt, val, vallen), FT_INSERT, ZERO_MSN, lbuf->xids);
+ ft_msg msg(
+ toku_fill_dbt(&kdbt, key, keylen),
+ toku_fill_dbt(&vdbt, val, vallen),
+ FT_INSERT,
+ ZERO_MSN,
+ lbuf->xids);
uint64_t workdone = 0;
// there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
- toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, &workdone, stats_to_update);
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ &workdone,
+ stats_to_update,
+ logical_rows_delta);
}
static int write_literal(struct dbout *out, void*data, size_t len) {
@@ -2905,7 +2980,14 @@ static int write_literal(struct dbout *out, void*data, size_t len) {
return result;
}
-static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, FTLOADER bl, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
+static void finish_leafnode(
+ struct dbout* out,
+ struct leaf_buf* lbuf,
+ int progress_allocation,
+ FTLOADER bl,
+ uint32_t target_basementnodesize,
+ enum toku_compression_method target_compression_method) {
+
int result = 0;
// serialize leaf to buffer
@@ -2913,7 +2995,16 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
size_t uncompressed_serialized_leaf_size = 0;
char *serialized_leaf = NULL;
FTNODE_DISK_DATA ndd = NULL;
- result = toku_serialize_ftnode_to_memory(lbuf->node, &ndd, target_basementnodesize, target_compression_method, true, true, &serialized_leaf_size, &uncompressed_serialized_leaf_size, &serialized_leaf);
+ result = toku_serialize_ftnode_to_memory(
+ lbuf->node,
+ &ndd,
+ target_basementnodesize,
+ target_compression_method,
+ true,
+ true,
+ &serialized_leaf_size,
+ &uncompressed_serialized_leaf_size,
+ &serialized_leaf);
// write it out
if (result == 0) {
@@ -2979,8 +3070,11 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
return result;
}
-static int
-write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk) {
+static int write_header(
+ struct dbout* out,
+ long long translation_location_on_disk,
+ long long translation_size_on_disk) {
+
int result = 0;
size_t size = toku_serialize_ft_size(out->ft->h);
size_t alloced_size = roundup_to_multiple(512, size);
@@ -2991,6 +3085,7 @@ write_header (struct dbout *out, long long translation_location_on_disk, long lo
} else {
wbuf_init(&wbuf, buf, size);
out->ft->h->on_disk_stats = out->ft->in_memory_stats;
+ out->ft->h->on_disk_logical_rows = out->ft->in_memory_logical_rows;
toku_serialize_ft_to_wbuf(&wbuf, out->ft->h, translation_location_on_disk, translation_size_on_disk);
for (size_t i=size; i<alloced_size; i++) buf[i]=0; // initialize all those unused spots to zero
if (wbuf.ndone != size)
diff --git a/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc b/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc
index 6dca8c25378..efaba49198d 100644
--- a/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc
+++ b/storage/tokudb/PerconaFT/ft/logger/log_upgrade.cc
@@ -265,8 +265,9 @@ toku_maybe_upgrade_log(const char *env_dir, const char *log_dir, LSN * lsn_of_cl
TXNID last_xid = TXNID_NONE;
r = verify_clean_shutdown_of_log_version(log_dir, version_of_logs_on_disk, &last_lsn, &last_xid);
if (r != 0) {
- if (TOKU_LOG_VERSION_25 <= version_of_logs_on_disk && version_of_logs_on_disk <= TOKU_LOG_VERSION_27
- && TOKU_LOG_VERSION_28 == TOKU_LOG_VERSION) {
+ if (version_of_logs_on_disk >= TOKU_LOG_VERSION_25 &&
+ version_of_logs_on_disk <= TOKU_LOG_VERSION_29 &&
+ TOKU_LOG_VERSION_29 == TOKU_LOG_VERSION) {
r = 0; // can do recovery on dirty shutdown
} else {
fprintf(stderr, "Cannot upgrade PerconaFT version %d database.", version_of_logs_on_disk);
diff --git a/storage/tokudb/PerconaFT/ft/logger/logger.h b/storage/tokudb/PerconaFT/ft/logger/logger.h
index 1f15f59fb3f..d9595d71065 100644
--- a/storage/tokudb/PerconaFT/ft/logger/logger.h
+++ b/storage/tokudb/PerconaFT/ft/logger/logger.h
@@ -54,6 +54,7 @@ enum {
TOKU_LOG_VERSION_26 = 26, // no change from 25
TOKU_LOG_VERSION_27 = 27, // no change from 26
TOKU_LOG_VERSION_28 = 28, // no change from 27
+ TOKU_LOG_VERSION_29 = 29, // no change from 28
TOKU_LOG_VERSION = FT_LAYOUT_VERSION,
TOKU_LOG_MIN_SUPPORTED_VERSION = FT_LAYOUT_MIN_SUPPORTED_VERSION,
};
diff --git a/storage/tokudb/PerconaFT/ft/logger/recover.h b/storage/tokudb/PerconaFT/ft/logger/recover.h
index 0d216c11a8b..bdd44d562cd 100644
--- a/storage/tokudb/PerconaFT/ft/logger/recover.h
+++ b/storage/tokudb/PerconaFT/ft/logger/recover.h
@@ -67,7 +67,7 @@ int tokuft_recover(DB_ENV *env,
// Effect: Check the tokuft logs to determine whether or not we need to run recovery.
// If the log is empty or if there is a clean shutdown at the end of the log, then we
-// dont need to run recovery.
+// don't need to run recovery.
// Returns: true if we need recovery, otherwise false.
int tokuft_needs_recovery(const char *logdir, bool ignore_empty_log);
diff --git a/storage/tokudb/PerconaFT/ft/node.cc b/storage/tokudb/PerconaFT/ft/node.cc
index 44dbc73ba2b..58ba675eb7c 100644
--- a/storage/tokudb/PerconaFT/ft/node.cc
+++ b/storage/tokudb/PerconaFT/ft/node.cc
@@ -206,12 +206,20 @@ int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, con
}
/**
- * Given a message buffer and and offset, apply the message with toku_ft_bn_apply_msg, or discard it,
+ * Given a message buffer and and offset, apply the message with
+ * toku_ft_bn_apply_msg, or discard it,
* based on its MSN and the MSN of the basement node.
*/
-static void
-do_bn_apply_msg(FT_HANDLE ft_handle, BASEMENTNODE bn, message_buffer *msg_buffer, int32_t offset,
- txn_gc_info *gc_info, uint64_t *workdone, STAT64INFO stats_to_update) {
+static void do_bn_apply_msg(
+ FT_HANDLE ft_handle,
+ BASEMENTNODE bn,
+ message_buffer* msg_buffer,
+ int32_t offset,
+ txn_gc_info* gc_info,
+ uint64_t* workdone,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta) {
+
DBT k, v;
ft_msg msg = msg_buffer->get_message(offset, &k, &v);
@@ -227,16 +235,17 @@ do_bn_apply_msg(FT_HANDLE ft_handle, BASEMENTNODE bn, message_buffer *msg_buffer
msg,
gc_info,
workdone,
- stats_to_update
- );
+ stats_to_update,
+ logical_rows_delta);
} else {
toku_ft_status_note_msn_discard();
}
// We must always mark message as stale since it has been marked
// (using omt::iterate_and_mark_range)
- // It is possible to call do_bn_apply_msg even when it won't apply the message because
- // the node containing it could have been evicted and brought back in.
+ // It is possible to call do_bn_apply_msg even when it won't apply the
+ // message because the node containing it could have been evicted and
+ // brought back in.
msg_buffer->set_freshness(offset, false);
}
@@ -248,12 +257,29 @@ struct iterate_do_bn_apply_msg_extra {
txn_gc_info *gc_info;
uint64_t *workdone;
STAT64INFO stats_to_update;
+ int64_t *logical_rows_delta;
};
-int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e) __attribute__((nonnull(3)));
-int iterate_do_bn_apply_msg(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_msg_extra *const e)
+int iterate_do_bn_apply_msg(
+ const int32_t &offset,
+ const uint32_t UU(idx),
+ struct iterate_do_bn_apply_msg_extra* const e)
+ __attribute__((nonnull(3)));
+
+int iterate_do_bn_apply_msg(
+ const int32_t &offset,
+ const uint32_t UU(idx),
+ struct iterate_do_bn_apply_msg_extra* const e)
{
- do_bn_apply_msg(e->t, e->bn, &e->bnc->msg_buffer, offset, e->gc_info, e->workdone, e->stats_to_update);
+ do_bn_apply_msg(
+ e->t,
+ e->bn,
+ &e->bnc->msg_buffer,
+ offset,
+ e->gc_info,
+ e->workdone,
+ e->stats_to_update,
+ e->logical_rows_delta);
return 0;
}
@@ -354,17 +380,15 @@ find_bounds_within_message_tree(
* or plus infinity respectively if they are NULL. Do not mark the node
* as dirty (preserve previous state of 'dirty' bit).
*/
-static void
-bnc_apply_messages_to_basement_node(
+static void bnc_apply_messages_to_basement_node(
FT_HANDLE t, // used for comparison function
BASEMENTNODE bn, // where to apply messages
FTNODE ancestor, // the ancestor node where we can find messages to apply
int childnum, // which child buffer of ancestor contains messages we want
const pivot_bounds &bounds, // contains pivot key bounds of this basement node
- txn_gc_info *gc_info,
- bool* msgs_applied
- )
-{
+ txn_gc_info* gc_info,
+ bool* msgs_applied) {
+
int r;
NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
@@ -372,16 +396,29 @@ bnc_apply_messages_to_basement_node(
// apply messages from this buffer
STAT64INFO_S stats_delta = {0,0};
uint64_t workdone_this_ancestor = 0;
+ int64_t logical_rows_delta = 0;
uint32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) {
- find_bounds_within_message_tree(t->ft->cmp, bnc->stale_message_tree, &bnc->msg_buffer, bounds, &stale_lbi, &stale_ube);
+ find_bounds_within_message_tree(
+ t->ft->cmp,
+ bnc->stale_message_tree,
+ &bnc->msg_buffer,
+ bounds,
+ &stale_lbi,
+ &stale_ube);
} else {
stale_lbi = 0;
stale_ube = 0;
}
uint32_t fresh_lbi, fresh_ube;
- find_bounds_within_message_tree(t->ft->cmp, bnc->fresh_message_tree, &bnc->msg_buffer, bounds, &fresh_lbi, &fresh_ube);
+ find_bounds_within_message_tree(
+ t->ft->cmp,
+ bnc->fresh_message_tree,
+ &bnc->msg_buffer,
+ bounds,
+ &fresh_lbi,
+ &fresh_ube);
// We now know where all the messages we must apply are, so one of the
// following 4 cases will do the application, depending on which of
@@ -395,7 +432,9 @@ bnc_apply_messages_to_basement_node(
// We have messages in multiple trees, so we grab all
// the relevant messages' offsets and sort them by MSN, then apply
// them in MSN order.
- const int buffer_size = ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) + bnc->broadcast_list.size());
+ const int buffer_size = ((stale_ube - stale_lbi) +
+ (fresh_ube - fresh_lbi) +
+ bnc->broadcast_list.size());
toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
struct store_msg_buffer_offset_extra sfo_extra = { .offsets = offsets, .i = 0 };
@@ -419,11 +458,27 @@ bnc_apply_messages_to_basement_node(
// Apply the messages in MSN order.
for (int i = 0; i < buffer_size; ++i) {
*msgs_applied = true;
- do_bn_apply_msg(t, bn, &bnc->msg_buffer, offsets[i], gc_info, &workdone_this_ancestor, &stats_delta);
+ do_bn_apply_msg(
+ t,
+ bn,
+ &bnc->msg_buffer,
+ offsets[i],
+ gc_info,
+ &workdone_this_ancestor,
+ &stats_delta,
+ &logical_rows_delta);
}
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
- struct iterate_do_bn_apply_msg_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .gc_info = gc_info, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
+ struct iterate_do_bn_apply_msg_extra iter_extra = {
+ .t = t,
+ .bn = bn,
+ .bnc = bnc,
+ .gc_info = gc_info,
+ .workdone = &workdone_this_ancestor,
+ .stats_to_update = &stats_delta,
+ .logical_rows_delta = &logical_rows_delta
+ };
if (fresh_ube - fresh_lbi > 0) *msgs_applied = true;
r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra, iterate_do_bn_apply_msg>(fresh_lbi, fresh_ube, &iter_extra);
assert_zero(r);
@@ -432,7 +487,15 @@ bnc_apply_messages_to_basement_node(
// No fresh messages to apply, we just apply stale messages.
if (stale_ube - stale_lbi > 0) *msgs_applied = true;
- struct iterate_do_bn_apply_msg_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .gc_info = gc_info, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
+ struct iterate_do_bn_apply_msg_extra iter_extra = {
+ .t = t,
+ .bn = bn,
+ .bnc = bnc,
+ .gc_info = gc_info,
+ .workdone = &workdone_this_ancestor,
+ .stats_to_update = &stats_delta,
+ .logical_rows_delta = &logical_rows_delta
+ };
r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_msg_extra, iterate_do_bn_apply_msg>(stale_lbi, stale_ube, &iter_extra);
assert_zero(r);
@@ -446,6 +509,7 @@ bnc_apply_messages_to_basement_node(
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
}
+ toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
}
static void
@@ -1073,7 +1137,8 @@ toku_ft_bn_apply_msg_once (
LEAFENTRY le,
txn_gc_info *gc_info,
uint64_t *workdone,
- STAT64INFO stats_to_update
+ STAT64INFO stats_to_update,
+ int64_t *logical_rows_delta
)
// Effect: Apply msg to leafentry (msn is ignored)
// Calculate work done by message on leafentry and add it to caller's workdone counter.
@@ -1082,26 +1147,34 @@ toku_ft_bn_apply_msg_once (
{
size_t newsize=0, oldsize=0, workdone_this_le=0;
LEAFENTRY new_le=0;
- int64_t numbytes_delta = 0; // how many bytes of user data (not including overhead) were added or deleted from this row
- int64_t numrows_delta = 0; // will be +1 or -1 or 0 (if row was added or deleted or not)
+ // how many bytes of user data (not including overhead) were added or
+ // deleted from this row
+ int64_t numbytes_delta = 0;
+ // will be +1 or -1 or 0 (if row was added or deleted or not)
+ int64_t numrows_delta = 0;
+ // will be +1, -1 or 0 if a message that was accounted for logically has
+ // changed in meaning such as an insert changed to an update or a delete
+ // changed to a noop
+ int64_t logical_rows_delta_le = 0;
uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t);
if (le) {
oldsize = leafentry_memsize(le) + key_storage_size;
}
- // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt() to allocate more space.
- // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is
- // no longer in use. We'll have to release the old mempool later.
- toku_le_apply_msg(
- msg,
+ // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt()
+ // to allocate more space. That means le is guaranteed to not cause a
+ // sigsegv but it may point to a mempool that is no longer in use.
+ // We'll have to release the old mempool later.
+ logical_rows_delta_le = toku_le_apply_msg(
+ msg,
le,
&bn->data_buffer,
idx,
le_keylen,
- gc_info,
- &new_le,
- &numbytes_delta
- );
+ gc_info,
+ &new_le,
+ &numbytes_delta);
+
// at this point, we cannot trust cmd->u.id.key to be valid.
// The dmt may have realloced its mempool and freed the one containing key.
@@ -1121,37 +1194,42 @@ toku_ft_bn_apply_msg_once (
numrows_delta = 1;
}
}
- if (workdone) { // test programs may call with NULL
+ if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL
*workdone += workdone_this_le;
}
+ if (FT_LIKELY(logical_rows_delta != NULL)) {
+ *logical_rows_delta += logical_rows_delta_le;
+ }
// now update stat64 statistics
bn->stat64_delta.numrows += numrows_delta;
bn->stat64_delta.numbytes += numbytes_delta;
// the only reason stats_to_update may be null is for tests
- if (stats_to_update) {
+ if (FT_LIKELY(stats_to_update != NULL)) {
stats_to_update->numrows += numrows_delta;
stats_to_update->numbytes += numbytes_delta;
}
-
}
static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in.
struct setval_extra_s {
uint32_t tag;
bool did_set_val;
- int setval_r; // any error code that setval_fun wants to return goes here.
+ // any error code that setval_fun wants to return goes here.
+ int setval_r;
// need arguments for toku_ft_bn_apply_msg_once
BASEMENTNODE bn;
- MSN msn; // captured from original message, not currently used
+ // captured from original message, not currently used
+ MSN msn;
XIDS xids;
- const DBT *key;
+ const DBT* key;
uint32_t idx;
uint32_t le_keylen;
LEAFENTRY le;
- txn_gc_info *gc_info;
- uint64_t * workdone; // set by toku_ft_bn_apply_msg_once()
+ txn_gc_info* gc_info;
+ uint64_t* workdone; // set by toku_ft_bn_apply_msg_once()
STAT64INFO stats_to_update;
+ int64_t* logical_rows_delta;
};
/*
@@ -1170,29 +1248,45 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
// can't leave scope until toku_ft_bn_apply_msg_once if
// this is a delete
DBT val;
- ft_msg msg(svextra->key,
- new_val ? new_val : toku_init_dbt(&val),
- new_val ? FT_INSERT : FT_DELETE_ANY,
- svextra->msn, svextra->xids);
- toku_ft_bn_apply_msg_once(svextra->bn, msg,
- svextra->idx, svextra->le_keylen, svextra->le,
- svextra->gc_info,
- svextra->workdone, svextra->stats_to_update);
+ ft_msg msg(
+ svextra->key,
+ new_val ? new_val : toku_init_dbt(&val),
+ new_val ? FT_INSERT : FT_DELETE_ANY,
+ svextra->msn,
+ svextra->xids);
+ toku_ft_bn_apply_msg_once(
+ svextra->bn,
+ msg,
+ svextra->idx,
+ svextra->le_keylen,
+ svextra->le,
+ svextra->gc_info,
+ svextra->workdone,
+ svextra->stats_to_update,
+ svextra->logical_rows_delta);
svextra->setval_r = 0;
}
}
-// We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls do_update()),
-// so capturing the msn in the setval_extra_s is not strictly required. The alternative
-// would be to put a dummy msn in the messages created by setval_fun(), but preserving
-// the original msn seems cleaner and it preserves accountability at a lower layer.
-static int do_update(ft_update_func update_fun, const DESCRIPTOR_S *desc, BASEMENTNODE bn, const ft_msg &msg, uint32_t idx,
- LEAFENTRY le,
- void* keydata,
- uint32_t keylen,
- txn_gc_info *gc_info,
- uint64_t * workdone,
- STAT64INFO stats_to_update) {
+// We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls
+// do_update()), so capturing the msn in the setval_extra_s is not strictly
+// required. The alternative would be to put a dummy msn in the messages
+// created by setval_fun(), but preserving the original msn seems cleaner and
+// it preserves accountability at a lower layer.
+static int do_update(
+ ft_update_func update_fun,
+ const DESCRIPTOR_S* desc,
+ BASEMENTNODE bn,
+ const ft_msg &msg,
+ uint32_t idx,
+ LEAFENTRY le,
+ void* keydata,
+ uint32_t keylen,
+ txn_gc_info* gc_info,
+ uint64_t* workdone,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta) {
+
LEAFENTRY le_for_update;
DBT key;
const DBT *keyp;
@@ -1232,39 +1326,52 @@ static int do_update(ft_update_func update_fun, const DESCRIPTOR_S *desc, BASEME
}
le_for_update = le;
- struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, msg.msn(), msg.xids(),
- keyp, idx, keylen, le_for_update, gc_info,
- workdone, stats_to_update};
- // call handlerton's ft->update_fun(), which passes setval_extra to setval_fun()
+ struct setval_extra_s setval_extra = {
+ setval_tag,
+ false,
+ 0,
+ bn,
+ msg.msn(),
+ msg.xids(),
+ keyp,
+ idx,
+ keylen,
+ le_for_update,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta
+ };
+ // call handlerton's ft->update_fun(), which passes setval_extra
+ // to setval_fun()
FAKE_DB(db, desc);
int r = update_fun(
&db,
keyp,
vdbtp,
update_function_extra,
- setval_fun, &setval_extra
- );
+ setval_fun,
+ &setval_extra);
if (r == 0) { r = setval_extra.setval_r; }
return r;
}
// Should be renamed as something like "apply_msg_to_basement()."
-void
-toku_ft_bn_apply_msg (
- const toku::comparator &cmp,
+void toku_ft_bn_apply_msg(
+ const toku::comparator& cmp,
ft_update_func update_fun,
BASEMENTNODE bn,
- const ft_msg &msg,
- txn_gc_info *gc_info,
- uint64_t *workdone,
- STAT64INFO stats_to_update
- )
+ const ft_msg& msg,
+ txn_gc_info* gc_info,
+ uint64_t* workdone,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta) {
// Effect:
// Put a msg into a leaf.
-// Calculate work done by message on leafnode and add it to caller's workdone counter.
+// Calculate work done by message on leafnode and add it to caller's
+// workdone counter.
// The leaf could end up "too big" or "too small". The caller must fix that up.
-{
LEAFENTRY storeddata;
void* key = NULL;
uint32_t keylen = 0;
@@ -1303,7 +1410,16 @@ toku_ft_bn_apply_msg (
} else {
assert_zero(r);
}
- toku_ft_bn_apply_msg_once(bn, msg, idx, keylen, storeddata, gc_info, workdone, stats_to_update);
+ toku_ft_bn_apply_msg_once(
+ bn,
+ msg,
+ idx,
+ keylen,
+ storeddata,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
// if the insertion point is within a window of the right edge of
// the leaf then it is sequential
@@ -1331,12 +1447,19 @@ toku_ft_bn_apply_msg (
&storeddata,
&key,
&keylen,
- &idx
- );
+ &idx);
if (r == DB_NOTFOUND) break;
assert_zero(r);
- toku_ft_bn_apply_msg_once(bn, msg, idx, keylen, storeddata, gc_info, workdone, stats_to_update);
-
+ toku_ft_bn_apply_msg_once(
+ bn,
+ msg,
+ idx,
+ keylen,
+ storeddata,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
break;
}
case FT_OPTIMIZE_FOR_UPGRADE:
@@ -1352,13 +1475,27 @@ toku_ft_bn_apply_msg (
assert_zero(r);
int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
- // message application code needs a key in order to determine how much
- // work was done by this message. since this is a broadcast message,
- // we have to create a new message whose key is the current le's key.
+ // message application code needs a key in order to determine
+ // how much work was done by this message. since this is a
+ // broadcast message, we have to create a new message whose
+ // key is the current le's key.
DBT curr_keydbt;
- ft_msg curr_msg(toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
- msg.vdbt(), msg.type(), msg.msn(), msg.xids());
- toku_ft_bn_apply_msg_once(bn, curr_msg, idx, curr_keylen, storeddata, gc_info, workdone, stats_to_update);
+ ft_msg curr_msg(
+ toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
+ msg.vdbt(),
+ msg.type(),
+ msg.msn(),
+ msg.xids());
+ toku_ft_bn_apply_msg_once(
+ bn,
+ curr_msg,
+ idx,
+ curr_keylen,
+ storeddata,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
// at this point, we cannot trust msg.kdbt to be valid.
uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
if (new_dmt_size != num_klpairs) {
@@ -1386,13 +1523,27 @@ toku_ft_bn_apply_msg (
assert_zero(r);
int deleted = 0;
if (le_has_xids(storeddata, msg.xids())) {
- // message application code needs a key in order to determine how much
- // work was done by this message. since this is a broadcast message,
- // we have to create a new message whose key is the current le's key.
+ // message application code needs a key in order to determine
+ // how much work was done by this message. since this is a
+ // broadcast message, we have to create a new message whose key
+ // is the current le's key.
DBT curr_keydbt;
- ft_msg curr_msg(toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
- msg.vdbt(), msg.type(), msg.msn(), msg.xids());
- toku_ft_bn_apply_msg_once(bn, curr_msg, idx, curr_keylen, storeddata, gc_info, workdone, stats_to_update);
+ ft_msg curr_msg(
+ toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
+ msg.vdbt(),
+ msg.type(),
+ msg.msn(),
+ msg.xids());
+ toku_ft_bn_apply_msg_once(
+ bn,
+ curr_msg,
+ idx,
+ curr_keylen,
+ storeddata,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
if (new_dmt_size != num_klpairs) {
paranoid_invariant(new_dmt_size + 1 == num_klpairs);
@@ -1424,9 +1575,33 @@ toku_ft_bn_apply_msg (
key = msg.kdbt()->data;
keylen = msg.kdbt()->size;
}
- r = do_update(update_fun, cmp.get_descriptor(), bn, msg, idx, NULL, NULL, 0, gc_info, workdone, stats_to_update);
+ r = do_update(
+ update_fun,
+ cmp.get_descriptor(),
+ bn,
+ msg,
+ idx,
+ NULL,
+ NULL,
+ 0,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
} else if (r==0) {
- r = do_update(update_fun, cmp.get_descriptor(), bn, msg, idx, storeddata, key, keylen, gc_info, workdone, stats_to_update);
+ r = do_update(
+ update_fun,
+ cmp.get_descriptor(),
+ bn,
+ msg,
+ idx,
+ storeddata,
+ key,
+ keylen,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
} // otherwise, a worse error, just return it
break;
}
@@ -1434,6 +1609,12 @@ toku_ft_bn_apply_msg (
// apply to all leafentries.
uint32_t idx = 0;
uint32_t num_leafentries_before;
+ // This is used to avoid having the logical row count changed on apply
+ // of this message since it will return a negative number of the number
+ // of leaf entries visited and cause the ft header value to go to 0;
+ // This message will not change the number of rows, so just use the
+ // bogus value.
+ int64_t temp_logical_rows_delta = 0;
while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
void* curr_key = nullptr;
uint32_t curr_keylen = 0;
@@ -1449,7 +1630,19 @@ toku_ft_bn_apply_msg (
// This is broken below. Have a compilation error checked
// in as a reminder
- r = do_update(update_fun, cmp.get_descriptor(), bn, msg, idx, storeddata, curr_key, curr_keylen, gc_info, workdone, stats_to_update);
+ r = do_update(
+ update_fun,
+ cmp.get_descriptor(),
+ bn,
+ msg,
+ idx,
+ storeddata,
+ curr_key,
+ curr_keylen,
+ gc_info,
+ workdone,
+ stats_to_update,
+ &temp_logical_rows_delta);
assert_zero(r);
if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
@@ -1810,24 +2003,22 @@ void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) {
}
}
-void
-toku_ftnode_put_msg (
+void toku_ftnode_put_msg(
const toku::comparator &cmp,
ft_update_func update_fun,
FTNODE node,
int target_childnum,
const ft_msg &msg,
bool is_fresh,
- txn_gc_info *gc_info,
+ txn_gc_info* gc_info,
size_t flow_deltas[],
- STAT64INFO stats_to_update
- )
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta) {
// Effect: Push message into the subtree rooted at NODE.
// If NODE is a leaf, then
// put message into leaf, applying it to the leafentries
// If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
// The node may become overfull. That's not our problem.
-{
toku_ftnode_assert_fully_in_memory(node);
//
// see comments in toku_ft_leaf_apply_msg
@@ -1836,26 +2027,40 @@ toku_ftnode_put_msg (
// and instead defer to these functions
//
if (node->height==0) {
- toku_ft_leaf_apply_msg(cmp, update_fun, node, target_childnum, msg, gc_info, nullptr, stats_to_update);
+ toku_ft_leaf_apply_msg(
+ cmp,
+ update_fun,
+ node,
+ target_childnum, msg,
+ gc_info,
+ nullptr,
+ stats_to_update,
+ logical_rows_delta);
} else {
- ft_nonleaf_put_msg(cmp, node, target_childnum, msg, is_fresh, flow_deltas);
+ ft_nonleaf_put_msg(
+ cmp,
+ node,
+ target_childnum,
+ msg,
+ is_fresh,
+ flow_deltas);
}
}
-// Effect: applies the message to the leaf if the appropriate basement node is in memory.
-// This function is called during message injection and/or flushing, so the entire
-// node MUST be in memory.
+// Effect: applies the message to the leaf if the appropriate basement node is
+// in memory. This function is called during message injection and/or
+// flushing, so the entire node MUST be in memory.
void toku_ft_leaf_apply_msg(
- const toku::comparator &cmp,
+ const toku::comparator& cmp,
ft_update_func update_fun,
FTNODE node,
int target_childnum, // which child to inject to, or -1 if unknown
- const ft_msg &msg,
- txn_gc_info *gc_info,
- uint64_t *workdone,
- STAT64INFO stats_to_update
- )
-{
+ const ft_msg& msg,
+ txn_gc_info* gc_info,
+ uint64_t* workdone,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta) {
+
VERIFY_NODE(t, node);
toku_ftnode_assert_fully_in_memory(node);
@@ -1891,34 +2096,36 @@ void toku_ft_leaf_apply_msg(
BASEMENTNODE bn = BLB(node, childnum);
if (msg.msn().msn > bn->max_msn_applied.msn) {
bn->max_msn_applied = msg.msn();
- toku_ft_bn_apply_msg(cmp,
- update_fun,
- bn,
- msg,
- gc_info,
- workdone,
- stats_to_update);
+ toku_ft_bn_apply_msg(
+ cmp,
+ update_fun,
+ bn,
+ msg,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
} else {
toku_ft_status_note_msn_discard();
}
- }
- else if (ft_msg_type_applies_all(msg.type())) {
+ } else if (ft_msg_type_applies_all(msg.type())) {
for (int childnum=0; childnum<node->n_children; childnum++) {
if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) {
BLB(node, childnum)->max_msn_applied = msg.msn();
- toku_ft_bn_apply_msg(cmp,
- update_fun,
- BLB(node, childnum),
- msg,
- gc_info,
- workdone,
- stats_to_update);
+ toku_ft_bn_apply_msg(
+ cmp,
+ update_fun,
+ BLB(node, childnum),
+ msg,
+ gc_info,
+ workdone,
+ stats_to_update,
+ logical_rows_delta);
} else {
toku_ft_status_note_msn_discard();
}
}
- }
- else if (!ft_msg_type_does_nothing(msg.type())) {
+ } else if (!ft_msg_type_does_nothing(msg.type())) {
invariant(ft_msg_type_does_nothing(msg.type()));
}
VERIFY_NODE(t, node);
diff --git a/storage/tokudb/PerconaFT/ft/node.h b/storage/tokudb/PerconaFT/ft/node.h
index 9d910491682..ad0298e81c5 100644
--- a/storage/tokudb/PerconaFT/ft/node.h
+++ b/storage/tokudb/PerconaFT/ft/node.h
@@ -382,25 +382,54 @@ enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize);
* If k is equal to some pivot, then we return the next (to the right)
* childnum.
*/
-int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp);
-
-void toku_ftnode_put_msg(const toku::comparator &cmp, ft_update_func update_fun,
- FTNODE node, int target_childnum,
- const ft_msg &msg, bool is_fresh, txn_gc_info *gc_info,
- size_t flow_deltas[], STAT64INFO stats_to_update);
-
-void toku_ft_bn_apply_msg_once(BASEMENTNODE bn, const ft_msg &msg, uint32_t idx,
- uint32_t le_keylen, LEAFENTRY le, txn_gc_info *gc_info,
- uint64_t *workdonep, STAT64INFO stats_to_update);
-
-void toku_ft_bn_apply_msg(const toku::comparator &cmp, ft_update_func update_fun,
- BASEMENTNODE bn, const ft_msg &msg, txn_gc_info *gc_info,
- uint64_t *workdone, STAT64INFO stats_to_update);
-
-void toku_ft_leaf_apply_msg(const toku::comparator &cmp, ft_update_func update_fun,
- FTNODE node, int target_childnum,
- const ft_msg &msg, txn_gc_info *gc_info,
- uint64_t *workdone, STAT64INFO stats_to_update);
+int toku_ftnode_hot_next_child(
+ FTNODE node,
+ const DBT* k,
+ const toku::comparator &cmp);
+
+void toku_ftnode_put_msg(
+ const toku::comparator& cmp,
+ ft_update_func update_fun,
+ FTNODE node,
+ int target_childnum,
+ const ft_msg& msg,
+ bool is_fresh,
+ txn_gc_info* gc_info,
+ size_t flow_deltas[],
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta);
+
+void toku_ft_bn_apply_msg_once(
+ BASEMENTNODE bn,
+ const ft_msg& msg,
+ uint32_t idx,
+ uint32_t le_keylen,
+ LEAFENTRY le,
+ txn_gc_info* gc_info,
+ uint64_t* workdonep,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta);
+
+void toku_ft_bn_apply_msg(
+ const toku::comparator& cmp,
+ ft_update_func update_fun,
+ BASEMENTNODE bn,
+ const ft_msg& msg,
+ txn_gc_info* gc_info,
+ uint64_t* workdone,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta);
+
+void toku_ft_leaf_apply_msg(
+ const toku::comparator& cmp,
+ ft_update_func update_fun,
+ FTNODE node,
+ int target_childnum,
+ const ft_msg& msg,
+ txn_gc_info* gc_info,
+ uint64_t* workdone,
+ STAT64INFO stats_to_update,
+ int64_t* logical_rows_delta);
//
// Message management for orthopush
diff --git a/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc b/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc
index a7bc2949276..49d4368a3ab 100644
--- a/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc
+++ b/storage/tokudb/PerconaFT/ft/serialize/ft-serialize.cc
@@ -323,6 +323,13 @@ int deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
fanout = rbuf_int(rb);
}
+ uint64_t on_disk_logical_rows;
+ on_disk_logical_rows = (uint64_t)-1;
+ if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_29) {
+ on_disk_logical_rows = rbuf_ulonglong(rb);
+ }
+ ft->in_memory_logical_rows = on_disk_logical_rows;
+
(void) rbuf_int(rb); //Read in checksum and ignore (already verified).
if (rb->ndone != rb->size) {
fprintf(stderr, "Header size did not match contents.\n");
@@ -357,7 +364,8 @@ int deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
.count_of_optimize_in_progress = count_of_optimize_in_progress,
.count_of_optimize_in_progress_read_from_disk = count_of_optimize_in_progress,
.msn_at_start_of_last_completed_optimize = msn_at_start_of_last_completed_optimize,
- .on_disk_stats = on_disk_stats
+ .on_disk_stats = on_disk_stats,
+ .on_disk_logical_rows = on_disk_logical_rows
};
XMEMDUP(ft->h, &h);
}
@@ -408,6 +416,8 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0;
switch(version) {
+ case FT_LAYOUT_VERSION_29:
+ size += sizeof(uint64_t); // logrows in ft
case FT_LAYOUT_VERSION_28:
size += sizeof(uint32_t); // fanout in ft
case FT_LAYOUT_VERSION_27:
@@ -754,6 +764,7 @@ void toku_serialize_ft_to_wbuf (
wbuf_MSN(wbuf, h->highest_unused_msn_for_upgrade);
wbuf_MSN(wbuf, h->max_msn_in_ft);
wbuf_int(wbuf, h->fanout);
+ wbuf_ulonglong(wbuf, h->on_disk_logical_rows);
uint32_t checksum = toku_x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum);
lazy_assert(wbuf->ndone == wbuf->size);
diff --git a/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h b/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h
index 72b6882bc06..9407a568337 100644
--- a/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h
+++ b/storage/tokudb/PerconaFT/ft/serialize/ft_layout_version.h
@@ -68,6 +68,7 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_26 = 26, // Hojo: basements store key/vals separately on disk for fixed klpair length BNs
FT_LAYOUT_VERSION_27 = 27, // serialize message trees with nonleaf buffers to avoid key, msn sort on deserialize
FT_LAYOUT_VERSION_28 = 28, // Add fanout to ft_header
+ FT_LAYOUT_VERSION_29 = 29, // Add logrows to ft_header
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
diff --git a/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt b/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt
index 0098b6091be..270ec97660a 100644
--- a/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt
+++ b/storage/tokudb/PerconaFT/ft/tests/CMakeLists.txt
@@ -112,11 +112,13 @@ if(BUILD_TESTING OR BUILD_FT_TESTS)
declare_custom_tests(test-upgrade-recovery-logs)
file(GLOB upgrade_tests "${TOKUDB_DATA}/upgrade-recovery-logs-??-clean")
+ file(GLOB upgrade_tests "${CMAKE_CURRENT_SOURCE_DIR}/upgrade.data/upgrade-recovery-logs-??-clean")
foreach(test ${upgrade_tests})
get_filename_component(test_basename "${test}" NAME)
add_ft_test_aux(test-${test_basename} test-upgrade-recovery-logs ${test})
endforeach(test)
file(GLOB upgrade_tests "${TOKUDB_DATA}/upgrade-recovery-logs-??-dirty")
+ file(GLOB upgrade_tests "${CMAKE_CURRENT_SOURCE_DIR}/upgrade.data/upgrade-recovery-logs-??-dirty")
foreach(test ${upgrade_tests})
get_filename_component(test_basename "${test}" NAME)
add_ft_test_aux(test-${test_basename} test-upgrade-recovery-logs ${test})
diff --git a/storage/tokudb/PerconaFT/ft/tests/make-tree.cc b/storage/tokudb/PerconaFT/ft/tests/make-tree.cc
index 663bbf3beb2..761d672539b 100644
--- a/storage/tokudb/PerconaFT/ft/tests/make-tree.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/make-tree.cc
@@ -74,11 +74,20 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
- toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
leafnode->max_msn_applied_to_node_on_disk = msn;
- // dont forget to dirty the node
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc b/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc
index 737c3556ad6..c37dcd089f8 100644
--- a/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/msnfilter.cc
@@ -82,49 +82,85 @@ append_leaf(FT_HANDLE ft, FTNODE leafnode, void *key, uint32_t keylen, void *val
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg, &gc_info, nullptr, nullptr);
+ toku_ft_leaf_apply_msg(
+ ft->ft->cmp,
+ ft->ft->update_fun,
+ leafnode,
+ -1,
+ msg,
+ &gc_info,
+ nullptr,
+ nullptr,
+ nullptr);
{
- int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair);
- assert(r==0);
- assert(pair.call_count==1);
+ int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair);
+ assert(r==0);
+ assert(pair.call_count==1);
}
ft_msg badmsg(&thekey, &badval, FT_INSERT, msn, toku_xids_get_root_xids());
- toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, badmsg, &gc_info, nullptr, nullptr);
+ toku_ft_leaf_apply_msg(
+ ft->ft->cmp,
+ ft->ft->update_fun,
+ leafnode,
+ -1,
+ badmsg,
+ &gc_info,
+ nullptr,
+ nullptr,
+ nullptr);
// message should be rejected for duplicate msn, row should still have original val
{
- int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair);
- assert(r==0);
- assert(pair.call_count==2);
+ int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair);
+ assert(r==0);
+ assert(pair.call_count==2);
}
// now verify that message with proper msn gets through
msn = next_dummymsn();
ft->ft->h->max_msn_in_ft = msn;
ft_msg msg2(&thekey, &val2, FT_INSERT, msn, toku_xids_get_root_xids());
- toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg2, &gc_info, nullptr, nullptr);
+ toku_ft_leaf_apply_msg(
+ ft->ft->cmp,
+ ft->ft->update_fun,
+ leafnode,
+ -1,
+ msg2,
+ &gc_info,
+ nullptr,
+ nullptr,
+ nullptr);
// message should be accepted, val should have new value
{
- int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2);
- assert(r==0);
- assert(pair2.call_count==1);
+ int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2);
+ assert(r==0);
+ assert(pair2.call_count==1);
}
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
ft_msg msg3(&thekey, &badval, FT_INSERT, msn, toku_xids_get_root_xids());
- toku_ft_leaf_apply_msg(ft->ft->cmp, ft->ft->update_fun, leafnode, -1, msg3, &gc_info, nullptr, nullptr);
+ toku_ft_leaf_apply_msg(
+ ft->ft->cmp,
+ ft->ft->update_fun,
+ leafnode,
+ -1,
+ msg3,
+ &gc_info,
+ nullptr,
+ nullptr,
+ nullptr);
// message should be rejected, val should still have value in pair2
{
- int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2);
- assert(r==0);
- assert(pair2.call_count==2);
+ int r = toku_ft_lookup(ft, &thekey, lookup_checkf, &pair2);
+ assert(r==0);
+ assert(pair2.call_count==2);
}
- // dont forget to dirty the node
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc b/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc
index 055a38e5f6d..393fb88ac2e 100644
--- a/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/orthopush-flush.cc
@@ -137,8 +137,24 @@ insert_random_message_to_bn(
*keyp = toku_xmemdup(keydbt->data, keydbt->size);
ft_msg msg(keydbt, valdbt, FT_INSERT, msn, xids);
int64_t numbytes;
- toku_le_apply_msg(msg, NULL, NULL, 0, keydbt->size, &non_mvcc_gc_info, save, &numbytes);
- toku_ft_bn_apply_msg(t->ft->cmp, t->ft->update_fun, blb, msg, &non_mvcc_gc_info, NULL, NULL);
+ toku_le_apply_msg(
+ msg,
+ NULL,
+ NULL,
+ 0,
+ keydbt->size,
+ &non_mvcc_gc_info,
+ save,
+ &numbytes);
+ toku_ft_bn_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ blb,
+ msg,
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn;
}
@@ -182,12 +198,36 @@ insert_same_message_to_bns(
*keyp = toku_xmemdup(keydbt->data, keydbt->size);
ft_msg msg(keydbt, valdbt, FT_INSERT, msn, xids);
int64_t numbytes;
- toku_le_apply_msg(msg, NULL, NULL, 0, keydbt->size, &non_mvcc_gc_info, save, &numbytes);
- toku_ft_bn_apply_msg(t->ft->cmp, t->ft->update_fun, blb1, msg, &non_mvcc_gc_info, NULL, NULL);
+ toku_le_apply_msg(
+ msg,
+ NULL,
+ NULL,
+ 0,
+ keydbt->size,
+ &non_mvcc_gc_info,
+ save,
+ &numbytes);
+ toku_ft_bn_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ blb1,
+ msg,
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn;
}
- toku_ft_bn_apply_msg(t->ft->cmp, t->ft->update_fun, blb2, msg, &non_mvcc_gc_info, NULL, NULL);
+ toku_ft_bn_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ blb2,
+ msg,
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
if (msn.msn > blb2->max_msn_applied.msn) {
blb2->max_msn_applied = msn;
}
@@ -619,7 +659,16 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
- toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
+ toku_ft_leaf_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ child,
+ -1,
+ *parent_messages[i],
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
}
}
for (i = 0; i < 8; ++i) {
@@ -842,7 +891,16 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(parent_messages[i]->kdbt(), &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) {
- toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
+ toku_ft_leaf_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ child,
+ -1,
+ *parent_messages[i],
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
}
}
for (i = 0; i < 8; ++i) {
@@ -1045,8 +1103,26 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
- toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child1, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
- toku_ft_leaf_apply_msg(t->ft->cmp, t->ft->update_fun, child2, -1, *parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
+ toku_ft_leaf_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ child1,
+ -1,
+ *parent_messages[i],
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
+ toku_ft_leaf_apply_msg(
+ t->ft->cmp,
+ t->ft->update_fun,
+ child2,
+ -1,
+ *parent_messages[i],
+ &non_mvcc_gc_info,
+ NULL,
+ NULL,
+ NULL);
}
}
for (i = 0; i < 8; ++i) {
diff --git a/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc b/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc
index 8e006498d77..7691ffaac2b 100644
--- a/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/test-upgrade-recovery-logs.cc
@@ -81,7 +81,7 @@ static void run_recovery(const char *testdir) {
bool upgrade_in_progress;
r = toku_maybe_upgrade_log(testdir, testdir, &lsn_of_clean_shutdown, &upgrade_in_progress);
if (strcmp(shutdown, "dirty") == 0 && log_version <= 24) {
- CKERR2(r, TOKUDB_UPGRADE_FAILURE); // we dont support dirty upgrade from versions <= 24
+ CKERR2(r, TOKUDB_UPGRADE_FAILURE); // we don't support dirty upgrade from versions <= 24
return;
} else {
CKERR(r);
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24
new file mode 100755
index 00000000000..9a56e83e627
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-clean/log000000000000.tokulog24
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24
new file mode 100755
index 00000000000..c552cda6673
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-24-dirty/log000000000000.tokulog24
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25
new file mode 100755
index 00000000000..26b8bcfbdcc
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-clean/log000000000000.tokulog25
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25
new file mode 100755
index 00000000000..04d3190c818
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-25-dirty/log000000000000.tokulog25
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26
new file mode 100755
index 00000000000..02047325aa6
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-clean/log000000000000.tokulog26
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26
new file mode 100755
index 00000000000..ce826b5608b
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-26-dirty/log000000000000.tokulog26
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27
new file mode 100755
index 00000000000..9849b977d73
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-clean/log000000000000.tokulog27
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27
new file mode 100755
index 00000000000..8b658ea4c0a
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-27-dirty/log000000000000.tokulog27
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28
new file mode 100644
index 00000000000..11fecfb94b2
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-clean/log000000000000.tokulog28
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28
new file mode 100644
index 00000000000..b7a9b03b583
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-28-dirty/log000000000000.tokulog28
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29
new file mode 100644
index 00000000000..a1f306f4a96
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-clean/log000000000000.tokulog29
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29 b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29
new file mode 100644
index 00000000000..b9e79eeb1c4
--- /dev/null
+++ b/storage/tokudb/PerconaFT/ft/tests/upgrade.data/upgrade-recovery-logs-29-dirty/log000000000000.tokulog29
Binary files differ
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc b/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc
index 68fac0e6a9c..b10885c2e62 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-bad-msn.cc
@@ -78,12 +78,21 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
// Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn;
- // dont forget to dirty the node
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc b/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc
index 49b2b8a6c21..c1d08ce41a6 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-bad-pivots.cc
@@ -65,9 +65,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
-
- // dont forget to dirty the node
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
+
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc b/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc
index 72c4063f51f..22a29c0ff69 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-dup-in-leaf.cc
@@ -66,9 +66,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
-
- // dont forget to dirty the node
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
+
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc b/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc
index f569f502dc8..80189dd9804 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-dup-pivots.cc
@@ -65,9 +65,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
-
- // dont forget to dirty the node
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
+
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc b/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc
index 3a6db8ee4de..a84aac1f063 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-misrouted-msgs.cc
@@ -66,9 +66,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode,0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
-
- // dont forget to dirty the node
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
+
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc
index 4392887718f..ca413f52567 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-leaf.cc
@@ -68,9 +68,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
-
- // dont forget to dirty the node
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
+
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc
index e3167bd3dc1..6efa06913c2 100644
--- a/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc
+++ b/storage/tokudb/PerconaFT/ft/tests/verify-unsorted-pivots.cc
@@ -65,9 +65,18 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
ft_msg msg(&thekey, &theval, FT_INSERT, msn, toku_xids_get_root_xids());
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
- toku_ft_bn_apply_msg_once(BLB(leafnode, 0), msg, idx, keylen, NULL, &gc_info, NULL, NULL);
-
- // dont forget to dirty the node
+ toku_ft_bn_apply_msg_once(
+ BLB(leafnode, 0),
+ msg,
+ idx,
+ keylen,
+ NULL,
+ &gc_info,
+ NULL,
+ NULL,
+ NULL);
+
+ // don't forget to dirty the node
leafnode->dirty = 1;
}
diff --git a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc
index 6a8c0d45b45..df830afd0df 100644
--- a/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc
+++ b/storage/tokudb/PerconaFT/ft/txn/rollback-apply.cc
@@ -186,6 +186,7 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
// Append the list to the front of the parent.
if (child_log->oldest_logentry) {
// There are some entries, so link them in.
+ parent_log->dirty = true;
child_log->oldest_logentry->prev = parent_log->newest_logentry;
if (!parent_log->oldest_logentry) {
parent_log->oldest_logentry = child_log->oldest_logentry;
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.cc b/storage/tokudb/PerconaFT/ft/txn/txn.cc
index cd0585dbf6c..dd03073a3ec 100644
--- a/storage/tokudb/PerconaFT/ft/txn/txn.cc
+++ b/storage/tokudb/PerconaFT/ft/txn/txn.cc
@@ -248,11 +248,24 @@ static txn_child_manager tcm;
.xa_xid = {0, 0, 0, ""},
.progress_poll_fun = NULL,
.progress_poll_fun_extra = NULL,
- .txn_lock = ZERO_MUTEX_INITIALIZER,
+
+ // You cannot initialize txn_lock a TOKU_MUTEX_INITIALIZER, because we
+ // will initialize it in the code below, and it cannot already
+ // be initialized at that point. Also, in general, you don't
+ // get to use PTHREAD_MUTEX_INITALIZER (which is what is inside
+ // TOKU_MUTEX_INITIALIZER) except in static variables, and this
+ // is initializing an auto variable.
+ //
+ // And we cannot simply avoid initializing these fields
+ // because, although it avoids -Wmissing-field-initializer
+ // errors under gcc, it gets other errors about non-trivial
+ // designated initializers not being supported.
+
+ .txn_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
.open_fts = open_fts,
.roll_info = roll_info,
- .state_lock = ZERO_MUTEX_INITIALIZER,
- .state_cond = TOKU_COND_INITIALIZER,
+ .state_lock = ZERO_MUTEX_INITIALIZER, // Not TOKU_MUTEX_INITIALIZER
+ .state_cond = ZERO_COND_INITIALIZER, // Not TOKU_COND_INITIALIZER
.state = TOKUTXN_LIVE,
.num_pin = 0,
.client_id = 0,
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc
index 551fd32b8d5..88eca36a261 100644
--- a/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.cc
@@ -45,7 +45,15 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#include "ft/txn/txn_manager.h"
#include "ft/txn/rollback.h"
#include "util/omt.h"
+//this is only for testing
+static void (* test_txn_sync_callback) (pthread_t, void *) = NULL;
+static void * test_txn_sync_callback_extra = NULL;
+
+void set_test_txn_sync_callback(void (*cb) (pthread_t, void *), void *extra) {
+ test_txn_sync_callback = cb;
+ test_txn_sync_callback_extra = extra;
+}
bool garbage_collection_debug = false;
static bool txn_records_snapshot(TXN_SNAPSHOT_TYPE snapshot_type, struct tokutxn *parent) {
@@ -525,14 +533,19 @@ void toku_txn_manager_handle_snapshot_create_for_child_txn(
XMALLOC(txn->live_root_txn_list);
txn_manager_lock(txn_manager);
txn_manager_create_snapshot_unlocked(txn_manager, txn);
- txn_manager_unlock(txn_manager);
}
else {
inherit_snapshot_from_parent(txn);
}
- if (copies_snapshot) {
- setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list);
- }
+
+ toku_debug_txn_sync(pthread_self());
+
+ if (copies_snapshot) {
+ if(!records_snapshot)
+ txn_manager_lock(txn_manager);
+ setup_live_root_txn_list(&txn_manager->live_root_ids, txn->live_root_txn_list);
+ txn_manager_unlock(txn_manager);
+ }
}
void toku_txn_manager_handle_snapshot_destroy_for_child_txn(
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn_manager.h b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h
index 658c6f9aecd..7cdc52c4f43 100644
--- a/storage/tokudb/PerconaFT/ft/txn/txn_manager.h
+++ b/storage/tokudb/PerconaFT/ft/txn/txn_manager.h
@@ -43,6 +43,15 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
#include "ft/txn/txn.h"
+void set_test_txn_sync_callback(void (*) (pthread_t, void*), void*);
+#define toku_test_txn_sync_callback(a) ((test_txn_sync_callback)? test_txn_sync_callback( a,test_txn_sync_callback_extra) : (void) 0)
+
+#if TOKU_DEBUG_TXN_SYNC
+#define toku_debug_txn_sync(a) toku_test_txn_sync_callback(a)
+#else
+#define toku_debug_txn_sync(a) ((void) 0)
+#endif
+
typedef struct txn_manager *TXN_MANAGER;
struct referenced_xid_tuple {
diff --git a/storage/tokudb/PerconaFT/ft/ule.cc b/storage/tokudb/PerconaFT/ft/ule.cc
index 573c4488f70..ac393fbf179 100644
--- a/storage/tokudb/PerconaFT/ft/ule.cc
+++ b/storage/tokudb/PerconaFT/ft/ule.cc
@@ -73,12 +73,11 @@ void toku_le_get_status(LE_STATUS statp) {
*statp = le_status;
}
-///////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
// Accessor functions used by outside world (e.g. indexer)
//
-ULEHANDLE
-toku_ule_create(LEAFENTRY le) {
+ULEHANDLE toku_ule_create(LEAFENTRY le) {
ULE XMALLOC(ule_p);
le_unpack(ule_p, le);
return (ULEHANDLE) ule_p;
@@ -89,7 +88,7 @@ void toku_ule_free(ULEHANDLE ule_p) {
toku_free(ule_p);
}
-///////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
//
// Question: Can any software outside this file modify or read a leafentry?
// If so, is it worthwhile to put it all here?
@@ -117,27 +116,43 @@ const UXR_S committed_delete = {
// Local functions:
-static void msg_init_empty_ule(ULE ule);
-static void msg_modify_ule(ULE ule, const ft_msg &msg);
-static void ule_init_empty_ule(ULE ule);
+static inline void msg_init_empty_ule(ULE ule);
+static int64_t msg_modify_ule(ULE ule, const ft_msg &msg);
+static inline void ule_init_empty_ule(ULE ule);
static void ule_do_implicit_promotions(ULE ule, XIDS xids);
-static void ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid);
+static void ule_try_promote_provisional_outermost(
+ ULE ule,
+ TXNID oldest_possible_live_xid);
static void ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index);
static void ule_promote_provisional_innermost_to_committed(ULE ule);
-static void ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp);
-static void ule_apply_delete(ULE ule, XIDS xids);
-static void ule_prepare_for_new_uxr(ULE ule, XIDS xids);
-static void ule_apply_abort(ULE ule, XIDS xids);
+static inline int64_t ule_apply_insert_no_overwrite(
+ ULE ule,
+ XIDS xids,
+ uint32_t vallen,
+ void* valp);
+static inline int64_t ule_apply_insert(
+ ULE ule,
+ XIDS xids,
+ uint32_t vallen,
+ void* valp);
+static inline int64_t ule_apply_delete(ULE ule, XIDS xids);
+static inline void ule_prepare_for_new_uxr(ULE ule, XIDS xids);
+static inline int64_t ule_apply_abort(ULE ule, XIDS xids);
static void ule_apply_broadcast_commit_all (ULE ule);
static void ule_apply_commit(ULE ule, XIDS xids);
-static void ule_push_insert_uxr(ULE ule, bool is_committed, TXNID xid, uint32_t vallen, void * valp);
-static void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid);
-static void ule_push_placeholder_uxr(ULE ule, TXNID xid);
-static UXR ule_get_innermost_uxr(ULE ule);
-static UXR ule_get_first_empty_uxr(ULE ule);
-static void ule_remove_innermost_uxr(ULE ule);
-static TXNID ule_get_innermost_xid(ULE ule);
-static TXNID ule_get_xid(ULE ule, uint32_t index);
+static inline void ule_push_insert_uxr(
+ ULE ule,
+ bool is_committed,
+ TXNID xid,
+ uint32_t vallen,
+ void* valp);
+static inline void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid);
+static inline void ule_push_placeholder_uxr(ULE ule, TXNID xid);
+static inline UXR ule_get_innermost_uxr(ULE ule);
+static inline UXR ule_get_first_empty_uxr(ULE ule);
+static inline void ule_remove_innermost_uxr(ULE ule);
+static inline TXNID ule_get_innermost_xid(ULE ule);
+static inline TXNID ule_get_xid(ULE ule, uint32_t index);
static void ule_remove_innermost_placeholders(ULE ule);
static void ule_add_placeholders(ULE ule, XIDS xids);
static void ule_optimize(ULE ule, XIDS xids);
@@ -153,6 +168,30 @@ static inline size_t uxr_unpack_type_and_length(UXR uxr, uint8_t *p);
static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p);
static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p);
+#if 0
+static void ule_print(ULE ule, const char* note) {
+ fprintf(stderr, "%s : ULE[0x%p]\n", note, ule);
+ fprintf(stderr, " num_puxrs[%u]\n", ule->num_puxrs);
+ fprintf(stderr, " num_cuxrs[%u]\n", ule->num_cuxrs);
+ fprintf(stderr, " innermost[%u]\n", ule->num_cuxrs + ule->num_puxrs - 1);
+ fprintf(stderr, " first_empty[%u]\n", ule->num_cuxrs + ule->num_puxrs);
+
+ uint32_t num_uxrs = ule->num_cuxrs + ule->num_puxrs - 1;
+ for (uint32_t uxr_num = 0; uxr_num <= num_uxrs; uxr_num++) {
+ UXR uxr = &(ule->uxrs[uxr_num]);
+ fprintf(stderr, " uxr[%u]\n", uxr_num);
+ switch (uxr->type) {
+ case 0: fprintf(stderr, " type[NONE]\n"); break;
+ case 1: fprintf(stderr, " type[INSERT]\n"); break;
+ case 2: fprintf(stderr, " type[DELETE]\n"); break;
+ case 3: fprintf(stderr, " type[PLACEHOLDER]\n"); break;
+ default: fprintf(stderr, " type[WHAT??]\n"); break;
+ }
+ fprintf(stderr, " xid[%lu]\n", uxr->xid);
+ }
+}
+#endif
+
static void get_space_for_le(
bn_data* data_buffer,
uint32_t idx,
@@ -162,21 +201,30 @@ static void get_space_for_le(
uint32_t old_le_size,
size_t size,
LEAFENTRY* new_le_space,
- void **const maybe_free
- )
-{
+ void** const maybe_free) {
+
if (data_buffer == nullptr) {
CAST_FROM_VOIDP(*new_le_space, toku_xmalloc(size));
- }
- else {
+ } else if (old_le_size > 0) {
// this means we are overwriting something
- if (old_le_size > 0) {
- data_buffer->get_space_for_overwrite(idx, keyp, keylen, old_keylen, old_le_size, size, new_le_space, maybe_free);
- }
+ data_buffer->get_space_for_overwrite(
+ idx,
+ keyp,
+ keylen,
+ old_keylen,
+ old_le_size,
+ size,
+ new_le_space,
+ maybe_free);
+ } else {
// this means we are inserting something new
- else {
- data_buffer->get_space_for_insert(idx, keyp, keylen, size, new_le_space, maybe_free);
- }
+ data_buffer->get_space_for_insert(
+ idx,
+ keyp,
+ keylen,
+ size,
+ new_le_space,
+ maybe_free);
}
}
@@ -185,15 +233,13 @@ static void get_space_for_le(
// Garbage collection related functions
//
-static TXNID
-get_next_older_txnid(TXNID xc, const xid_omt_t &omt) {
+static TXNID get_next_older_txnid(TXNID xc, const xid_omt_t &omt) {
int r;
TXNID xid;
r = omt.find<TXNID, toku_find_xid_by_xid>(xc, -1, &xid, nullptr);
if (r==0) {
invariant(xid < xc); //sanity check
- }
- else {
+ } else {
invariant(r==DB_NOTFOUND);
xid = TXNID_NONE;
}
@@ -201,17 +247,32 @@ get_next_older_txnid(TXNID xc, const xid_omt_t &omt) {
}
//
-// This function returns true if live transaction TL1 is allowed to read a value committed by
-// transaction xc, false otherwise.
+// This function returns true if live transaction TL1 is allowed to read a
+// value committed by transaction xc, false otherwise.
//
-static bool
-xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, const rx_omt_t &referenced_xids) {
+static bool xid_reads_committed_xid(
+ TXNID tl1,
+ TXNID xc,
+ const xid_omt_t& snapshot_txnids,
+ const rx_omt_t& referenced_xids) {
+
bool rval;
- if (tl1 < xc) rval = false; //cannot read a newer txn
- else {
- TXNID x = toku_get_youngest_live_list_txnid_for(xc, snapshot_txnids, referenced_xids);
- if (x == TXNID_NONE) rval = true; //Not in ANY live list, tl1 can read it.
- else rval = tl1 > x; //Newer than the 'newest one that has it in live list'
+ if (tl1 < xc) {
+ rval = false; //cannot read a newer txn
+ } else {
+ TXNID x =
+ toku_get_youngest_live_list_txnid_for(
+ xc,
+ snapshot_txnids,
+ referenced_xids);
+
+ if (x == TXNID_NONE) {
+ //Not in ANY live list, tl1 can read it.
+ rval = true;
+ } else {
+ //Newer than the 'newest one that has it in live list'
+ rval = tl1 > x;
+ }
// we know tl1 > xc
// we know x > xc
// if tl1 == x, then we do not read, because tl1 is in xc's live list
@@ -228,8 +289,7 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c
// than oldest_referenced_xid. All elements below this entry are garbage,
// so we get rid of them.
//
-static void
-ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
+static void ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
if (ule->num_cuxrs == 1) {
return;
}
@@ -240,7 +300,8 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
// uxr with a txnid that is less than oldest_referenced_xid
for (uint32_t i = 0; i < ule->num_cuxrs; i++) {
curr_index = ule->num_cuxrs - i - 1;
- if (ule->uxrs[curr_index].xid < gc_info->oldest_referenced_xid_for_simple_gc) {
+ if (ule->uxrs[curr_index].xid <
+ gc_info->oldest_referenced_xid_for_simple_gc) {
break;
}
}
@@ -250,12 +311,15 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
curr_index = ule->num_cuxrs - 1;
}
- // curr_index is now set to the youngest uxr older than oldest_referenced_xid
- // so if it's not the bottom of the stack..
+ // curr_index is now set to the youngest uxr older than
+ // oldest_referenced_xid so if it's not the bottom of the stack..
if (curr_index != 0) {
// ..then we need to get rid of the entries below curr_index
uint32_t num_entries = ule->num_cuxrs + ule->num_puxrs - curr_index;
- memmove(&ule->uxrs[0], &ule->uxrs[curr_index], num_entries * sizeof(ule->uxrs[0]));
+ memmove(
+ &ule->uxrs[0],
+ &ule->uxrs[curr_index],
+ num_entries * sizeof(ule->uxrs[0]));
ule->uxrs[0].xid = TXNID_NONE; // New 'bottom of stack' loses its TXNID
ule->num_cuxrs -= curr_index;
}
@@ -264,8 +328,12 @@ ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
// TODO: Clean this up
extern bool garbage_collection_debug;
-static void
-ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) {
+static void ule_garbage_collect(
+ ULE ule,
+ const xid_omt_t& snapshot_xids,
+ const rx_omt_t& referenced_xids,
+ const xid_omt_t& live_root_txns) {
+
if (ule->num_cuxrs == 1) {
return;
}
@@ -289,10 +357,12 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
// If we find that the committed transaction is in the live list,
// then xc is really in the process of being committed. It has not
// been fully committed. As a result, our assumption that transactions
- // newer than what is currently in these OMTs will read the top of the stack
- // is not necessarily accurate. Transactions may read what is just below xc.
- // As a result, we must mark what is just below xc as necessary and move on.
- // This issue was found while testing flusher threads, and was fixed for #3979
+ // newer than what is currently in these OMTs will read the top of the
+ // stack is not necessarily accurate. Transactions may read what is
+ // just below xc.
+ // As a result, we must mark what is just below xc as necessary and
+ // move on. This issue was found while testing flusher threads, and was
+ // fixed for #3979
//
bool is_xc_live = toku_is_txn_in_live_root_txn_list(live_root_txns, xc);
if (is_xc_live) {
@@ -300,13 +370,19 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
continue;
}
- tl1 = toku_get_youngest_live_list_txnid_for(xc, snapshot_xids, referenced_xids);
+ tl1 =
+ toku_get_youngest_live_list_txnid_for(
+ xc,
+ snapshot_xids,
+ referenced_xids);
- // if tl1 == xc, that means xc should be live and show up in live_root_txns, which we check above.
+ // if tl1 == xc, that means xc should be live and show up in
+ // live_root_txns, which we check above.
invariant(tl1 != xc);
if (tl1 == TXNID_NONE) {
- // set tl1 to youngest live transaction older than ule->uxrs[curr_committed_entry]->xid
+ // set tl1 to youngest live transaction older than
+ // ule->uxrs[curr_committed_entry]->xid
tl1 = get_next_older_txnid(xc, snapshot_xids);
if (tl1 == TXNID_NONE) {
// remainder is garbage, we're done
@@ -314,8 +390,13 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
}
}
if (garbage_collection_debug) {
- int r = snapshot_xids.find_zero<TXNID, toku_find_xid_by_xid>(tl1, nullptr, nullptr);
- invariant_zero(r); // make sure that the txn you are claiming is live is actually live
+ int r =
+ snapshot_xids.find_zero<TXNID, toku_find_xid_by_xid>(
+ tl1,
+ nullptr,
+ nullptr);
+ // make sure that the txn you are claiming is live is actually live
+ invariant_zero(r);
}
//
// tl1 should now be set
@@ -323,7 +404,11 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
curr_committed_entry--;
while (curr_committed_entry > 0) {
xc = ule->uxrs[curr_committed_entry].xid;
- if (xid_reads_committed_xid(tl1, xc, snapshot_xids, referenced_xids)) {
+ if (xid_reads_committed_xid(
+ tl1,
+ xc,
+ snapshot_xids,
+ referenced_xids)) {
break;
}
curr_committed_entry--;
@@ -343,7 +428,10 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
ule->uxrs[0].xid = TXNID_NONE; //New 'bottom of stack' loses its TXNID
if (first_free != ule->num_cuxrs) {
// Shift provisional values
- memmove(&ule->uxrs[first_free], &ule->uxrs[ule->num_cuxrs], ule->num_puxrs * sizeof(ule->uxrs[0]));
+ memmove(
+ &ule->uxrs[first_free],
+ &ule->uxrs[ule->num_cuxrs],
+ ule->num_puxrs * sizeof(ule->uxrs[0]));
}
ule->num_cuxrs = saved;
}
@@ -367,29 +455,42 @@ enum {
ULE_MIN_MEMSIZE_TO_FORCE_GC = 1024 * 1024
};
-/////////////////////////////////////////////////////////////////////////////////
-// This is the big enchilada. (Bring Tums.) Note that this level of abstraction
-// has no knowledge of the inner structure of either leafentry or msg. It makes
-// calls into the next lower layer (msg_xxx) which handles messages.
+////////////////////////////////////////////////////////////////////////////////
+// This is the big enchilada. (Bring Tums.) Note that this level of
+// abstraction has no knowledge of the inner structure of either leafentry or
+// msg. It makes calls into the next lower layer (msg_xxx) which handles
+// messages.
//
// NOTE: This is the only function (at least in this body of code) that modifies
// a leafentry.
// NOTE: It is the responsibility of the caller to make sure that the key is set
// in the FT_MSG, as it will be used to store the data in the data_buffer
//
-// Return 0 on success.
-// If the leafentry is destroyed it sets *new_leafentry_p to NULL.
-// Otehrwise the new_leafentry_p points at the new leaf entry.
-// As of October 2011, this function always returns 0.
-void
-toku_le_apply_msg(const ft_msg &msg,
- LEAFENTRY old_leafentry, // NULL if there was no stored data.
- bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
- uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
- uint32_t old_keylen, // length of the any key in data_buffer
- txn_gc_info *gc_info,
- LEAFENTRY *new_leafentry_p,
- int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
+// Returns -1, 0, or 1 that identifies the change in logical row count needed
+// based on the results of the message application. For example, if a delete
+// finds no logical leafentry or if an insert finds a duplicate and is
+// converted to an update.
+//
+// old_leafentry - NULL if there was no stored data.
+// data_buffer - bn_data storing leafentry, if NULL, means there is no bn_data
+// idx - index in data_buffer where leafentry is stored
+// (and should be replaced)
+// old_keylen - length of the any key in data_buffer
+// new_leafentry_p - If the leafentry is destroyed it sets *new_leafentry_p
+// to NULL. Otherwise the new_leafentry_p points at the new
+// leaf entry.
+// numbytes_delta_p - change in total size of key and val, not including any
+// overhead
+int64_t toku_le_apply_msg(
+ const ft_msg& msg,
+ LEAFENTRY old_leafentry,
+ bn_data* data_buffer,
+ uint32_t idx,
+ uint32_t old_keylen,
+ txn_gc_info* gc_info,
+ LEAFENTRY* new_leafentry_p,
+ int64_t* numbytes_delta_p) {
+
invariant_notnull(gc_info);
paranoid_invariant_notnull(new_leafentry_p);
ULE_S ule;
@@ -397,6 +498,7 @@ toku_le_apply_msg(const ft_msg &msg,
int64_t newnumbytes = 0;
uint64_t oldmemsize = 0;
uint32_t keylen = msg.kdbt()->size;
+ int32_t rowcountdelta = 0;
if (old_leafentry == NULL) {
msg_init_empty_ule(&ule);
@@ -405,49 +507,62 @@ toku_le_apply_msg(const ft_msg &msg,
le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
oldnumbytes = ule_get_innermost_numbytes(&ule, keylen);
}
- msg_modify_ule(&ule, msg); // modify unpacked leafentry
- // - we may be able to immediately promote the newly-apllied outermost provisonal uxr
- // - either way, run simple gc first, and then full gc if there are still some committed uxrs.
- ule_try_promote_provisional_outermost(&ule, gc_info->oldest_referenced_xid_for_implicit_promotion);
+ // modify unpacked leafentry
+ rowcountdelta = msg_modify_ule(&ule, msg);
+
+ // - we may be able to immediately promote the newly-apllied outermost
+ // provisonal uxr
+ // - either way, run simple gc first, and then full gc if there are still
+ // some committed uxrs.
+ ule_try_promote_provisional_outermost(
+ &ule,
+ gc_info->oldest_referenced_xid_for_implicit_promotion);
ule_simple_garbage_collection(&ule, gc_info);
txn_manager_state *txn_state_for_gc = gc_info->txn_state_for_gc;
size_t size_before_gc = 0;
- if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr && // there is garbage to clean, and our caller gave us state..
- // ..and either the state is pre-initialized, or the committed stack is large enough
- (txn_state_for_gc->initialized || ule.num_cuxrs >= ULE_MIN_STACK_SIZE_TO_FORCE_GC ||
- // ..or the ule's raw memsize is sufficiently large
- (size_before_gc = ule_packed_memsize(&ule)) >= ULE_MIN_MEMSIZE_TO_FORCE_GC)) {
- // ..then it's worth running gc, possibly initializing the txn manager state, if it isn't already
+ // there is garbage to clean, and our caller gave us state..
+ // ..and either the state is pre-initialized, or the committed stack is
+ // large enough
+ // ..or the ule's raw memsize is sufficiently large
+ // ..then it's worth running gc, possibly initializing the txn manager
+ // state, if it isn't already
+ if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr &&
+ (txn_state_for_gc->initialized ||
+ ule.num_cuxrs >= ULE_MIN_STACK_SIZE_TO_FORCE_GC ||
+ (size_before_gc = ule_packed_memsize(&ule)) >=
+ ULE_MIN_MEMSIZE_TO_FORCE_GC)) {
if (!txn_state_for_gc->initialized) {
txn_state_for_gc->init();
}
-
- size_before_gc = size_before_gc != 0 ? size_before_gc : // it's already been calculated above
- ule_packed_memsize(&ule);
- ule_garbage_collect(&ule,
- txn_state_for_gc->snapshot_xids,
- txn_state_for_gc->referenced_xids,
- txn_state_for_gc->live_root_txns
- );
+ // it's already been calculated above
+ size_before_gc =
+ size_before_gc != 0 ? size_before_gc : ule_packed_memsize(&ule);
+ ule_garbage_collect(
+ &ule,
+ txn_state_for_gc->snapshot_xids,
+ txn_state_for_gc->referenced_xids,
+ txn_state_for_gc->live_root_txns);
size_t size_after_gc = ule_packed_memsize(&ule);
LE_STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
LE_STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
}
- void *maybe_free = nullptr;
- int r = le_pack(
- &ule, // create packed leafentry
- data_buffer,
- idx,
- msg.kdbt()->data, // contract of this function is caller has this set, always
- keylen, // contract of this function is caller has this set, always
- old_keylen,
- oldmemsize,
- new_leafentry_p,
- &maybe_free
- );
+ void* maybe_free = nullptr;
+ // create packed leafentry
+ // contract of this function is caller has keyp and keylen set, always
+ int r =
+ le_pack(
+ &ule,
+ data_buffer,
+ idx,
+ msg.kdbt()->data,
+ keylen,
+ old_keylen,
+ oldmemsize,
+ new_leafentry_p,
+ &maybe_free);
invariant_zero(r);
if (*new_leafentry_p) {
newnumbytes = ule_get_innermost_numbytes(&ule, keylen);
@@ -458,16 +573,22 @@ toku_le_apply_msg(const ft_msg &msg,
if (maybe_free != nullptr) {
toku_free(maybe_free);
}
+ return rowcountdelta;
}
-bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info) {
-// Effect: Quickly determines if it's worth trying to run garbage collection on a leafentry
+bool toku_le_worth_running_garbage_collection(
+ LEAFENTRY le,
+ txn_gc_info* gc_info) {
+// Effect: Quickly determines if it's worth trying to run garbage collection
+// on a leafentry
// Return: True if it makes sense to try garbage collection, false otherwise.
// Rationale: Garbage collection is likely to clean up under two circumstances:
-// 1.) There are multiple committed entries. Some may never be read by new txns.
-// 2.) There is only one committed entry, but the outermost provisional entry
-// is older than the oldest known referenced xid, so it must have commited.
-// Therefor we can promote it to committed and get rid of the old commited entry.
+// 1.) There are multiple committed entries. Some may never be read
+// by new txns.
+// 2.) There is only one committed entry, but the outermost
+// provisional entry is older than the oldest known referenced
+// xid, so it must have commited. Therefor we can promote it to
+// committed and get rid of the old commited entry.
if (le->type != LE_MVCC) {
return false;
}
@@ -477,7 +598,8 @@ bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info
paranoid_invariant(le->u.mvcc.num_cxrs == 1);
}
return le->u.mvcc.num_pxrs > 0 &&
- le_outermost_uncommitted_xid(le) < gc_info->oldest_referenced_xid_for_implicit_promotion;
+ le_outermost_uncommitted_xid(le) <
+ gc_info->oldest_referenced_xid_for_implicit_promotion;
}
// Garbage collect one leaf entry, using the given OMT's.
@@ -498,16 +620,18 @@ bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info
// -- referenced_xids : list of in memory active transactions.
// NOTE: it is not a good idea to garbage collect a leaf
// entry with only one committed value.
-void
-toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
- bn_data* data_buffer,
- uint32_t idx,
- void* keyp,
- uint32_t keylen,
- txn_gc_info *gc_info,
- LEAFENTRY *new_leaf_entry,
- int64_t * numbytes_delta_p) {
- // We shouldn't want to run gc without having provided a snapshot of the txn system.
+void toku_le_garbage_collect(
+ LEAFENTRY old_leaf_entry,
+ bn_data* data_buffer,
+ uint32_t idx,
+ void* keyp,
+ uint32_t keylen,
+ txn_gc_info* gc_info,
+ LEAFENTRY* new_leaf_entry,
+ int64_t* numbytes_delta_p) {
+
+ // We shouldn't want to run gc without having provided a snapshot of the
+ // txn system.
invariant_notnull(gc_info);
invariant_notnull(gc_info->txn_state_for_gc);
paranoid_invariant_notnull(new_leaf_entry);
@@ -520,20 +644,24 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
oldnumbytes = ule_get_innermost_numbytes(&ule, keylen);
uint32_t old_mem_size = leafentry_memsize(old_leaf_entry);
- // Before running garbage collection, try to promote the outermost provisional
- // entries to committed if its xid is older than the oldest possible live xid.
+ // Before running garbage collection, try to promote the outermost
+ // provisional entries to committed if its xid is older than the oldest
+ // possible live xid.
//
// The oldest known refeferenced xid is a lower bound on the oldest possible
// live xid, so we use that. It's usually close enough to get rid of most
// garbage in leafentries.
- ule_try_promote_provisional_outermost(&ule, gc_info->oldest_referenced_xid_for_implicit_promotion);
+ ule_try_promote_provisional_outermost(
+ &ule,
+ gc_info->oldest_referenced_xid_for_implicit_promotion);
// No need to run simple gc here if we're going straight for full gc.
if (ule.num_cuxrs > 1) {
size_t size_before_gc = ule_packed_memsize(&ule);
- ule_garbage_collect(&ule,
- gc_info->txn_state_for_gc->snapshot_xids,
- gc_info->txn_state_for_gc->referenced_xids,
- gc_info->txn_state_for_gc->live_root_txns);
+ ule_garbage_collect(
+ &ule,
+ gc_info->txn_state_for_gc->snapshot_xids,
+ gc_info->txn_state_for_gc->referenced_xids,
+ gc_info->txn_state_for_gc->live_root_txns);
size_t size_after_gc = ule_packed_memsize(&ule);
LE_STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
@@ -541,17 +669,18 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
}
void *maybe_free = nullptr;
- int r = le_pack(
- &ule,
- data_buffer,
- idx,
- keyp,
- keylen,
- keylen, // old_keylen, same because the key isn't going to change for gc
- old_mem_size,
- new_leaf_entry,
- &maybe_free
- );
+ // old_keylen, same because the key isn't going to change for gc
+ int r =
+ le_pack(
+ &ule,
+ data_buffer,
+ idx,
+ keyp,
+ keylen,
+ keylen,
+ old_mem_size,
+ new_leaf_entry,
+ &maybe_free);
invariant_zero(r);
if (*new_leaf_entry) {
newnumbytes = ule_get_innermost_numbytes(&ule, keylen);
@@ -564,49 +693,54 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
}
}
-/////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (msg_xxx)
// knows the accessors of msg, but not of leafentry or unpacked leaf entry.
// It makes calls into the lower layer (le_xxx) which handles leafentries.
// Purpose is to init the ule with given key and no transaction records
//
-static void
-msg_init_empty_ule(ULE ule) {
+static inline void msg_init_empty_ule(ULE ule) {
ule_init_empty_ule(ule);
}
// Purpose is to modify the unpacked leafentry in our private workspace.
//
-static void
-msg_modify_ule(ULE ule, const ft_msg &msg) {
+// Returns -1, 0, or 1 that identifies the change in logical row count needed
+// based on the results of the message application. For example, if a delete
+// finds no logical leafentry or if an insert finds a duplicate and is
+// converted to an update.
+static int64_t msg_modify_ule(ULE ule, const ft_msg &msg) {
+ int64_t retval = 0;
XIDS xids = msg.xids();
invariant(toku_xids_get_num_xids(xids) < MAX_TRANSACTION_RECORDS);
enum ft_msg_type type = msg.type();
- if (type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE) {
+ if (FT_LIKELY(type != FT_OPTIMIZE && type != FT_OPTIMIZE_FOR_UPGRADE)) {
ule_do_implicit_promotions(ule, xids);
}
switch (type) {
- case FT_INSERT_NO_OVERWRITE: {
- UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
- //If something exists, quit (no overwrite).
- if (uxr_is_insert(old_innermost_uxr)) break;
- //else it is just an insert, so
- //fall through to FT_INSERT on purpose.
- }
- case FT_INSERT: {
- uint32_t vallen = msg.vdbt()->size;
- invariant(IS_VALID_LEN(vallen));
- void * valp = msg.vdbt()->data;
- ule_apply_insert(ule, xids, vallen, valp);
+ case FT_INSERT_NO_OVERWRITE:
+ retval =
+ ule_apply_insert_no_overwrite(
+ ule,
+ xids,
+ msg.vdbt()->size,
+ msg.vdbt()->data);
+ break;
+ case FT_INSERT:
+ retval =
+ ule_apply_insert(
+ ule,
+ xids,
+ msg.vdbt()->size,
+ msg.vdbt()->data);
break;
- }
case FT_DELETE_ANY:
- ule_apply_delete(ule, xids);
+ retval = ule_apply_delete(ule, xids);
break;
case FT_ABORT_ANY:
case FT_ABORT_BROADCAST_TXN:
- ule_apply_abort(ule, xids);
+ retval = ule_apply_abort(ule, xids);
break;
case FT_COMMIT_BROADCAST_ALL:
ule_apply_broadcast_commit_all(ule);
@@ -621,34 +755,40 @@ msg_modify_ule(ULE ule, const ft_msg &msg) {
break;
case FT_UPDATE:
case FT_UPDATE_BROADCAST_ALL:
- assert(false); // These messages don't get this far. Instead they get translated (in setval_fun in do_update) into FT_INSERT messages.
+ // These messages don't get this far. Instead they get translated (in
+ // setval_fun in do_update) into FT_INSERT messages.
+ assert(false);
break;
default:
- assert(false); /* illegal ft msg type */
+ // illegal ft msg type
+ assert(false);
break;
}
+ return retval;
}
-void test_msg_modify_ule(ULE ule, const ft_msg &msg){
+void test_msg_modify_ule(ULE ule, const ft_msg &msg) {
msg_modify_ule(ule,msg);
}
static void ule_optimize(ULE ule, XIDS xids) {
if (ule->num_puxrs) {
- TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid; // outermost uncommitted
+ // outermost uncommitted
+ TXNID uncommitted = ule->uxrs[ule->num_cuxrs].xid;
TXNID oldest_living_xid = TXNID_NONE;
uint32_t num_xids = toku_xids_get_num_xids(xids);
if (num_xids > 0) {
invariant(num_xids==1);
oldest_living_xid = toku_xids_get_xid(xids, 0);
}
- if (oldest_living_xid == TXNID_NONE || uncommitted < oldest_living_xid) {
+ if (oldest_living_xid == TXNID_NONE ||
+ uncommitted < oldest_living_xid) {
ule_promote_provisional_innermost_to_committed(ule);
}
}
}
-/////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (le_xxx) understands the structure of the leafentry
// and of the unpacked leafentry. It is the only layer that understands the
// structure of leafentry. It has no knowledge of any other data structures.
@@ -657,8 +797,7 @@ static void ule_optimize(ULE ule, XIDS xids) {
//
// required for every le_unpack that is done
//
-void
-ule_cleanup(ULE ule) {
+void ule_cleanup(ULE ule) {
invariant(ule->uxrs);
if (ule->uxrs != ule->uxrs_static) {
toku_free(ule->uxrs);
@@ -668,8 +807,7 @@ ule_cleanup(ULE ule) {
// populate an unpacked leafentry using pointers into the given leafentry.
// thus, the memory referenced by 'le' must live as long as the ULE.
-void
-le_unpack(ULE ule, LEAFENTRY le) {
+void le_unpack(ULE ule, LEAFENTRY le) {
uint8_t type = le->type;
uint8_t *p;
uint32_t i;
@@ -694,9 +832,10 @@ le_unpack(ULE ule, LEAFENTRY le) {
//Dynamic memory
if (ule->num_cuxrs < MAX_TRANSACTION_RECORDS) {
ule->uxrs = ule->uxrs_static;
- }
- else {
- XMALLOC_N(ule->num_cuxrs + 1 + MAX_TRANSACTION_RECORDS, ule->uxrs);
+ } else {
+ XMALLOC_N(
+ ule->num_cuxrs + 1 + MAX_TRANSACTION_RECORDS,
+ ule->uxrs);
}
p = le->u.mvcc.xrs;
@@ -717,9 +856,12 @@ le_unpack(ULE ule, LEAFENTRY le) {
p += uxr_unpack_length_and_bit(innermost, p);
}
for (i = 0; i < ule->num_cuxrs; i++) {
- p += uxr_unpack_length_and_bit(ule->uxrs + ule->num_cuxrs - 1 - i, p);
+ p +=
+ uxr_unpack_length_and_bit(
+ ule->uxrs + ule->num_cuxrs - 1 - i,
+ p);
}
-
+
//unpack interesting values inner to outer
if (ule->num_puxrs!=0) {
UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
@@ -761,14 +903,12 @@ le_unpack(ULE ule, LEAFENTRY le) {
#endif
}
-static inline size_t
-uxr_pack_txnid(UXR uxr, uint8_t *p) {
+static inline size_t uxr_pack_txnid(UXR uxr, uint8_t *p) {
*(TXNID*)p = toku_htod64(uxr->xid);
return sizeof(TXNID);
}
-static inline size_t
-uxr_pack_type_and_length(UXR uxr, uint8_t *p) {
+static inline size_t uxr_pack_type_and_length(UXR uxr, uint8_t *p) {
size_t rval = 1;
*p = uxr->type;
if (uxr_is_insert(uxr)) {
@@ -778,21 +918,18 @@ uxr_pack_type_and_length(UXR uxr, uint8_t *p) {
return rval;
}
-static inline size_t
-uxr_pack_length_and_bit(UXR uxr, uint8_t *p) {
+static inline size_t uxr_pack_length_and_bit(UXR uxr, uint8_t *p) {
uint32_t length_and_bit;
if (uxr_is_insert(uxr)) {
length_and_bit = INSERT_LENGTH(uxr->vallen);
- }
- else {
+ } else {
length_and_bit = DELETE_LENGTH(uxr->vallen);
}
*(uint32_t*)p = toku_htod32(length_and_bit);
return sizeof(uint32_t);
}
-static inline size_t
-uxr_pack_data(UXR uxr, uint8_t *p) {
+static inline size_t uxr_pack_data(UXR uxr, uint8_t *p) {
if (uxr_is_insert(uxr)) {
memcpy(p, uxr->valp, uxr->vallen);
return uxr->vallen;
@@ -800,14 +937,12 @@ uxr_pack_data(UXR uxr, uint8_t *p) {
return 0;
}
-static inline size_t
-uxr_unpack_txnid(UXR uxr, uint8_t *p) {
+static inline size_t uxr_unpack_txnid(UXR uxr, uint8_t *p) {
uxr->xid = toku_dtoh64(*(TXNID*)p);
return sizeof(TXNID);
}
-static inline size_t
-uxr_unpack_type_and_length(UXR uxr, uint8_t *p) {
+static inline size_t uxr_unpack_type_and_length(UXR uxr, uint8_t *p) {
size_t rval = 1;
uxr->type = *p;
if (uxr_is_insert(uxr)) {
@@ -817,22 +952,19 @@ uxr_unpack_type_and_length(UXR uxr, uint8_t *p) {
return rval;
}
-static inline size_t
-uxr_unpack_length_and_bit(UXR uxr, uint8_t *p) {
+static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p) {
uint32_t length_and_bit = toku_dtoh32(*(uint32_t*)p);
if (IS_INSERT(length_and_bit)) {
uxr->type = XR_INSERT;
uxr->vallen = GET_LENGTH(length_and_bit);
- }
- else {
+ } else {
uxr->type = XR_DELETE;
uxr->vallen = 0;
}
return sizeof(uint32_t);
}
-static inline size_t
-uxr_unpack_data(UXR uxr, uint8_t *p) {
+static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p) {
if (uxr_is_insert(uxr)) {
uxr->valp = p;
return uxr->vallen;
@@ -841,8 +973,7 @@ uxr_unpack_data(UXR uxr, uint8_t *p) {
}
// executed too often to be worth making threadsafe
-static inline void
-update_le_status(ULE ule, size_t memsize) {
+static inline void update_le_status(ULE ule, size_t memsize) {
if (ule->num_cuxrs > LE_STATUS_VAL(LE_MAX_COMMITTED_XR))
LE_STATUS_VAL(LE_MAX_COMMITTED_XR) = ule->num_cuxrs;
if (ule->num_puxrs > LE_STATUS_VAL(LE_MAX_PROVISIONAL_XR))
@@ -856,21 +987,22 @@ update_le_status(ULE ule, size_t memsize) {
// Purpose is to return a newly allocated leaf entry in packed format, or
// return null if leaf entry should be destroyed (if no transaction records
// are for inserts).
-// Transaction records in packed le are stored inner to outer (first xr is innermost),
-// with some information extracted out of the transaction records into the header.
+// Transaction records in packed le are stored inner to outer (first xr is
+// innermost), with some information extracted out of the transaction records
+// into the header.
// Transaction records in ule are stored outer to inner (uxr[0] is outermost).
-int
-le_pack(ULE ule, // data to be packed into new leafentry
- bn_data* data_buffer,
- uint32_t idx,
- void* keyp,
- uint32_t keylen,
- uint32_t old_keylen,
- uint32_t old_le_size,
- LEAFENTRY * const new_leafentry_p, // this is what this function creates
- void **const maybe_free
- )
-{
+// Takes 'ule' and creates 'new_leafentry_p
+int le_pack(
+ ULE ule,
+ bn_data* data_buffer,
+ uint32_t idx,
+ void* keyp,
+ uint32_t keylen,
+ uint32_t old_keylen,
+ uint32_t old_le_size,
+ LEAFENTRY* const new_leafentry_p,
+ void** const maybe_free) {
+
invariant(ule->num_cuxrs > 0);
invariant(ule->uxrs[0].xid == TXNID_NONE);
int rval;
@@ -888,7 +1020,8 @@ le_pack(ULE ule, // data to be packed into new leafentry
}
}
if (data_buffer && old_le_size > 0) {
- // must pass old_keylen and old_le_size, since that's what is actually stored in data_buffer
+ // must pass old_keylen and old_le_size, since that's what is
+ // actually stored in data_buffer
data_buffer->delete_leafentry(idx, old_keylen, old_le_size);
}
*new_leafentry_p = NULL;
@@ -898,14 +1031,24 @@ le_pack(ULE ule, // data to be packed into new leafentry
found_insert:
memsize = le_memsize_from_ule(ule);
LEAFENTRY new_leafentry;
- get_space_for_le(data_buffer, idx, keyp, keylen, old_keylen, old_le_size, memsize, &new_leafentry, maybe_free);
+ get_space_for_le(
+ data_buffer,
+ idx,
+ keyp,
+ keylen,
+ old_keylen,
+ old_le_size,
+ memsize,
+ &new_leafentry,
+ maybe_free);
//p always points to first unused byte after leafentry we are packing
uint8_t *p;
invariant(ule->num_cuxrs>0);
//Type specific data
if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
- //Pack a 'clean leafentry' (no uncommitted transactions, only one committed value)
+ //Pack a 'clean leafentry' (no uncommitted transactions, only one
+ //committed value)
new_leafentry->type = LE_CLEAN;
uint32_t vallen = ule->uxrs[0].vallen;
@@ -917,8 +1060,7 @@ found_insert:
//Set p to after leafentry
p = new_leafentry->u.clean.val + vallen;
- }
- else {
+ } else {
uint32_t i;
//Pack an 'mvcc leafentry'
new_leafentry->type = LE_MVCC;
@@ -969,7 +1111,9 @@ found_insert:
p += uxr_pack_data(outermost, p);
}
//pack txnid, length, bit, data for non-outermost, non-innermost
- for (i = ule->num_cuxrs + 1; i < ule->num_cuxrs + ule->num_puxrs - 1; i++) {
+ for (i = ule->num_cuxrs + 1;
+ i < ule->num_cuxrs + ule->num_puxrs - 1;
+ i++) {
UXR uxr = ule->uxrs + i;
p += uxr_pack_txnid(uxr, p);
p += uxr_pack_type_and_length(uxr, p);
@@ -1022,13 +1166,13 @@ cleanup:
return rval;
}
-//////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
// Following functions provide convenient access to a packed leafentry.
//Requires:
-// Leafentry that ule represents should not be destroyed (is not just all deletes)
-size_t
-le_memsize_from_ule (ULE ule) {
+// Leafentry that ule represents should not be destroyed (is not just all
+// deletes)
+size_t le_memsize_from_ule (ULE ule) {
invariant(ule->num_cuxrs);
size_t rval;
if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
@@ -1037,13 +1181,13 @@ le_memsize_from_ule (ULE ule) {
rval = 1 //type
+4 //vallen
+committed->vallen; //actual val
- }
- else {
+ } else {
rval = 1 //type
+4 //num_cuxrs
+1 //num_puxrs
+4*(ule->num_cuxrs) //types+lengths for committed
- +8*(ule->num_cuxrs + ule->num_puxrs - 1); //txnids (excluding superroot)
+ +8*(ule->num_cuxrs + ule->num_puxrs - 1); //txnids (excluding
+ //superroot)
uint32_t i;
//Count data from committed uxrs and innermost puxr
for (i = 0; i < ule->num_cuxrs; i++) {
@@ -1072,8 +1216,11 @@ le_memsize_from_ule (ULE ule) {
}
// TODO: rename
-size_t
-leafentry_rest_memsize(uint32_t num_puxrs, uint32_t num_cuxrs, uint8_t* start) {
+size_t leafentry_rest_memsize(
+ uint32_t num_puxrs,
+ uint32_t num_cuxrs,
+ uint8_t* start) {
+
UXR_S uxr;
size_t lengths = 0;
uint8_t* p = start;
@@ -1122,8 +1269,7 @@ leafentry_rest_memsize(uint32_t num_puxrs, uint32_t num_cuxrs, uint8_t* start) {
return rval;
}
-size_t
-leafentry_memsize (LEAFENTRY le) {
+size_t leafentry_memsize (LEAFENTRY le) {
size_t rval = 0;
uint8_t type = le->type;
@@ -1162,13 +1308,11 @@ leafentry_memsize (LEAFENTRY le) {
return rval;
}
-size_t
-leafentry_disksize (LEAFENTRY le) {
+size_t leafentry_disksize (LEAFENTRY le) {
return leafentry_memsize(le);
}
-bool
-le_is_clean(LEAFENTRY le) {
+bool le_is_clean(LEAFENTRY le) {
uint8_t type = le->type;
uint32_t rval;
switch (type) {
@@ -1228,13 +1372,14 @@ int le_latest_is_del(LEAFENTRY le) {
//
-// returns true if the outermost provisional transaction id on the leafentry's stack matches
-// the outermost transaction id in xids
-// It is used to determine if a broadcast commit/abort message (look in ft-ops.c) should be applied to this leafentry
-// If the outermost transactions match, then the broadcast commit/abort should be applied
+// returns true if the outermost provisional transaction id on the leafentry's
+// stack matches the outermost transaction id in xids
+// It is used to determine if a broadcast commit/abort message (look in ft-ops.c)
+// should be applied to this leafentry
+// If the outermost transactions match, then the broadcast commit/abort should
+// be applied
//
-bool
-le_has_xids(LEAFENTRY le, XIDS xids) {
+bool le_has_xids(LEAFENTRY le, XIDS xids) {
//Read num_uxrs
uint32_t num_xids = toku_xids_get_num_xids(xids);
invariant(num_xids > 0); //Disallow checking for having TXNID_NONE
@@ -1245,8 +1390,7 @@ le_has_xids(LEAFENTRY le, XIDS xids) {
return rval;
}
-void*
-le_latest_val_and_len (LEAFENTRY le, uint32_t *len) {
+void* le_latest_val_and_len (LEAFENTRY le, uint32_t *len) {
uint8_t type = le->type;
void *valp;
@@ -1277,8 +1421,7 @@ le_latest_val_and_len (LEAFENTRY le, uint32_t *len) {
if (uxr_is_insert(&uxr)) {
*len = uxr.vallen;
valp = p + (num_cuxrs - 1 + (num_puxrs!=0))*sizeof(uint32_t);
- }
- else {
+ } else {
*len = 0;
valp = NULL;
}
@@ -1295,8 +1438,7 @@ le_latest_val_and_len (LEAFENTRY le, uint32_t *len) {
if (uxr_is_insert(uxr)) {
slow_valp = uxr->valp;
slow_len = uxr->vallen;
- }
- else {
+ } else {
slow_valp = NULL;
slow_len = 0;
}
@@ -1310,8 +1452,7 @@ le_latest_val_and_len (LEAFENTRY le, uint32_t *len) {
}
//DEBUG ONLY can be slow
-void*
-le_latest_val (LEAFENTRY le) {
+void* le_latest_val (LEAFENTRY le) {
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
@@ -1325,8 +1466,7 @@ le_latest_val (LEAFENTRY le) {
}
//needed to be fast for statistics.
-uint32_t
-le_latest_vallen (LEAFENTRY le) {
+uint32_t le_latest_vallen (LEAFENTRY le) {
uint32_t rval;
uint8_t type = le->type;
uint8_t *p;
@@ -1354,8 +1494,7 @@ le_latest_vallen (LEAFENTRY le) {
uxr_unpack_length_and_bit(&uxr, p);
if (uxr_is_insert(&uxr)) {
rval = uxr.vallen;
- }
- else {
+ } else {
rval = 0;
}
break;
@@ -1377,8 +1516,7 @@ le_latest_vallen (LEAFENTRY le) {
return rval;
}
-uint64_t
-le_outermost_uncommitted_xid (LEAFENTRY le) {
+uint64_t le_outermost_uncommitted_xid (LEAFENTRY le) {
uint64_t rval = TXNID_NONE;
uint8_t type = le->type;
@@ -1412,8 +1550,7 @@ le_outermost_uncommitted_xid (LEAFENTRY le) {
//Optimization not required. This is a debug only function.
//Print a leafentry out in human-readable format
-int
-print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) {
+int print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) {
ULE_S ule;
le_unpack(&ule, le);
uint32_t i;
@@ -1444,23 +1581,21 @@ print_klpair (FILE *outf, const void* keyp, uint32_t keylen, LEAFENTRY le) {
return 0;
}
-/////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (ule_xxx) knows the structure of the unpacked
// leafentry and no other structure.
//
// ule constructor
// Note that transaction 0 is explicit in the ule
-static void
-ule_init_empty_ule(ULE ule) {
+static inline void ule_init_empty_ule(ULE ule) {
ule->num_cuxrs = 1;
ule->num_puxrs = 0;
ule->uxrs = ule->uxrs_static;
ule->uxrs[0] = committed_delete;
}
-static inline int32_t
-min_i32(int32_t a, int32_t b) {
+static inline int32_t min_i32(int32_t a, int32_t b) {
int32_t rval = a < b ? a : b;
return rval;
}
@@ -1470,8 +1605,8 @@ min_i32(int32_t a, int32_t b) {
//
// If the leafentry has already been promoted, there is nothing to do.
// We have two transaction stacks (one from message, one from leaf entry).
-// We want to implicitly promote transactions newer than (but not including)
-// the innermost common ancestor (ICA) of the two stacks of transaction ids. We
+// We want to implicitly promote transactions newer than (but not including)
+// the innermost common ancestor (ICA) of the two stacks of transaction ids. We
// know that this is the right thing to do because each transaction with an id
// greater (later) than the ICA must have been either committed or aborted.
// If it was aborted then we would have seen an abort message and removed the
@@ -1483,8 +1618,7 @@ min_i32(int32_t a, int32_t b) {
// record of ICA, keeping the transaction id of the ICA.
// Outermost xid is zero for both ule and xids<>
//
-static void
-ule_do_implicit_promotions(ULE ule, XIDS xids) {
+static void ule_do_implicit_promotions(ULE ule, XIDS xids) {
//Optimization for (most) common case.
//No commits necessary if everything is already committed.
if (ule->num_puxrs > 0) {
@@ -1506,17 +1640,16 @@ ule_do_implicit_promotions(ULE ule, XIDS xids) {
if (ica_index < ule->num_cuxrs) {
invariant(ica_index == ule->num_cuxrs - 1);
ule_promote_provisional_innermost_to_committed(ule);
- }
- else if (ica_index < ule->num_cuxrs + ule->num_puxrs - 1) {
- //If ica is the innermost uxr in the leafentry, no commits are necessary.
+ } else if (ica_index < ule->num_cuxrs + ule->num_puxrs - 1) {
+ //If ica is the innermost uxr in the leafentry, no commits are
+ //necessary.
ule_promote_provisional_innermost_to_index(ule, ica_index);
}
}
}
-static void
-ule_promote_provisional_innermost_to_committed(ULE ule) {
+static void ule_promote_provisional_innermost_to_committed(ULE ule) {
//Must be something to promote.
invariant(ule->num_puxrs);
//Take value (or delete flag) from innermost.
@@ -1532,8 +1665,7 @@ ule_promote_provisional_innermost_to_committed(ULE ule) {
ule->num_puxrs = 0; //Discard all provisional uxrs.
if (uxr_is_delete(old_innermost_uxr)) {
ule_push_delete_uxr(ule, true, old_outermost_uncommitted_uxr->xid);
- }
- else {
+ } else {
ule_push_insert_uxr(ule, true,
old_outermost_uncommitted_uxr->xid,
old_innermost_uxr->vallen,
@@ -1541,11 +1673,13 @@ ule_promote_provisional_innermost_to_committed(ULE ule) {
}
}
-static void
-ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid) {
+static void ule_try_promote_provisional_outermost(
+ ULE ule,
+ TXNID oldest_possible_live_xid) {
// Effect: If there is a provisional record whose outermost xid is older than
// the oldest known referenced_xid, promote it to committed.
- if (ule->num_puxrs > 0 && ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) {
+ if (ule->num_puxrs > 0 &&
+ ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) {
ule_promote_provisional_innermost_to_committed(ule);
}
}
@@ -1553,8 +1687,9 @@ ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid) {
// Purpose is to promote the value (and type) of the innermost transaction
// record to the uxr at the specified index (keeping the txnid of the uxr at
// specified index.)
-static void
-ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index) {
+static void ule_promote_provisional_innermost_to_index(
+ ULE ule,
+ uint32_t index) {
//Must not promote to committed portion of stack.
invariant(index >= ule->num_cuxrs);
//Must actually be promoting.
@@ -1562,15 +1697,17 @@ ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index) {
UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
assert(!uxr_is_placeholder(old_innermost_uxr));
TXNID new_innermost_xid = ule->uxrs[index].xid;
- ule->num_puxrs = index - ule->num_cuxrs; //Discard old uxr at index (and everything inner)
+ //Discard old uxr at index (and everything inner)
+ ule->num_puxrs = index - ule->num_cuxrs;
if (uxr_is_delete(old_innermost_uxr)) {
ule_push_delete_uxr(ule, false, new_innermost_xid);
- }
- else {
- ule_push_insert_uxr(ule, false,
- new_innermost_xid,
- old_innermost_uxr->vallen,
- old_innermost_uxr->valp);
+ } else {
+ ule_push_insert_uxr(
+ ule,
+ false,
+ new_innermost_xid,
+ old_innermost_uxr->vallen,
+ old_innermost_uxr->valp);
}
}
@@ -1581,19 +1718,60 @@ ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index) {
// Purpose is to apply an insert message to this leafentry:
-static void
-ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp) {
+static inline int64_t ule_apply_insert_no_overwrite(
+ ULE ule,
+ XIDS xids,
+ uint32_t vallen,
+ void* valp) {
+
+ invariant(IS_VALID_LEN(vallen));
+ int64_t retval = 0;
+ UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
+ // If something exists, don't overwrite
+ if (uxr_is_insert(old_innermost_uxr)) {
+ retval = -1;
+ return retval;
+ }
ule_prepare_for_new_uxr(ule, xids);
- TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction doing this insert
+ // xid of transaction doing this insert
+ TXNID this_xid = toku_xids_get_innermost_xid(xids);
ule_push_insert_uxr(ule, this_xid == TXNID_NONE, this_xid, vallen, valp);
+ return retval;
+}
+
+// Purpose is to apply an insert message to this leafentry:
+static inline int64_t ule_apply_insert(
+ ULE ule,
+ XIDS xids,
+ uint32_t vallen,
+ void* valp) {
+
+ invariant(IS_VALID_LEN(vallen));
+ int64_t retval = 0;
+ UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
+ // If something exists, overwrite
+ if (uxr_is_insert(old_innermost_uxr)) {
+ retval = -1;
+ }
+ ule_prepare_for_new_uxr(ule, xids);
+ // xid of transaction doing this insert
+ TXNID this_xid = toku_xids_get_innermost_xid(xids);
+ ule_push_insert_uxr(ule, this_xid == TXNID_NONE, this_xid, vallen, valp);
+ return retval;
}
// Purpose is to apply a delete message to this leafentry:
-static void
-ule_apply_delete(ULE ule, XIDS xids) {
+static inline int64_t ule_apply_delete(ULE ule, XIDS xids) {
+ int64_t retval = 0;
+ UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
+ if (FT_UNLIKELY(uxr_is_delete(old_innermost_uxr))) {
+ retval = 1;
+ }
ule_prepare_for_new_uxr(ule, xids);
- TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction doing this delete
+ // xid of transaction doing this delete
+ TXNID this_xid = toku_xids_get_innermost_xid(xids);
ule_push_delete_uxr(ule, this_xid == TXNID_NONE, this_xid);
+ return retval;
}
// First, discard anything done earlier by this transaction.
@@ -1601,20 +1779,18 @@ ule_apply_delete(ULE ule, XIDS xids) {
// outer transactions that are newer than then newest (innermost) transaction in
// the leafentry. If so, record those outer transactions in the leafentry
// with placeholders.
-static void
-ule_prepare_for_new_uxr(ULE ule, XIDS xids) {
+static inline void ule_prepare_for_new_uxr(ULE ule, XIDS xids) {
TXNID this_xid = toku_xids_get_innermost_xid(xids);
//This is for LOADER_USE_PUTS or transactionless environment
//where messages use XIDS of 0
if (this_xid == TXNID_NONE && ule_get_innermost_xid(ule) == TXNID_NONE) {
ule_remove_innermost_uxr(ule);
- }
- // case where we are transactional and xids stack matches ule stack
- else if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) {
+ } else if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) {
+ // case where we are transactional and xids stack matches ule stack
ule_remove_innermost_uxr(ule);
- }
- // case where we are transactional and xids stack does not match ule stack
- else {
+ } else {
+ // case where we are transactional and xids stack does not match ule
+ // stack
ule_add_placeholders(ule, xids);
}
}
@@ -1625,10 +1801,12 @@ ule_prepare_for_new_uxr(ULE ule, XIDS xids) {
// then there is nothing to be done.
// If this transaction did modify the leafentry, then undo whatever it did (by
// removing the transaction record (uxr) and any placeholders underneath.
-// Remember, the innermost uxr can only be an insert or a delete, not a placeholder.
-static void
-ule_apply_abort(ULE ule, XIDS xids) {
- TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction doing this abort
+// Remember, the innermost uxr can only be an insert or a delete, not a
+// placeholder.
+static inline int64_t ule_apply_abort(ULE ule, XIDS xids) {
+ int64_t retval = 0;
+ // xid of transaction doing this abort
+ TXNID this_xid = toku_xids_get_innermost_xid(xids);
invariant(this_xid!=TXNID_NONE);
UXR innermost = ule_get_innermost_uxr(ule);
// need to check for provisional entries in ule, otherwise
@@ -1636,15 +1814,34 @@ ule_apply_abort(ULE ule, XIDS xids) {
// in a bug where the most recently committed has same xid
// as the XID's innermost
if (ule->num_puxrs > 0 && innermost->xid == this_xid) {
+ // if this is a rollback of a delete of a new ule, return 0
+ // (i.e. double delete)
+ if (uxr_is_delete(innermost)) {
+ if (ule->num_puxrs == 1 && ule->num_cuxrs == 1 &&
+ uxr_is_delete(&(ule->uxrs[0]))) {
+ retval = 0;
+ } else {
+ retval = 1;
+ }
+ } else if (uxr_is_insert(innermost)) {
+ if (ule->num_puxrs == 1 && ule->num_cuxrs == 1 &&
+ uxr_is_insert(&(ule->uxrs[0]))) {
+ retval = 0;
+ } else {
+ retval = -1;
+ }
+ }
+ // if this is a rollback of a insert of an exising ule, return 0
+ // (i.e. double insert)
invariant(ule->num_puxrs>0);
ule_remove_innermost_uxr(ule);
ule_remove_innermost_placeholders(ule);
}
invariant(ule->num_cuxrs > 0);
+ return retval;
}
-static void
-ule_apply_broadcast_commit_all (ULE ule) {
+static void ule_apply_broadcast_commit_all (ULE ule) {
ule->uxrs[0] = ule->uxrs[ule->num_puxrs + ule->num_cuxrs - 1];
ule->uxrs[0].xid = TXNID_NONE;
ule->num_puxrs = 0;
@@ -1657,9 +1854,11 @@ ule_apply_broadcast_commit_all (ULE ule) {
// then there is nothing to be done.
// Also, if there are no uncommitted transaction records there is nothing to do.
// If this transaction did modify the leafentry, then promote whatever it did.
-// Remember, the innermost uxr can only be an insert or a delete, not a placeholder.
+// Remember, the innermost uxr can only be an insert or a delete, not a
+// placeholder.
void ule_apply_commit(ULE ule, XIDS xids) {
- TXNID this_xid = toku_xids_get_innermost_xid(xids); // xid of transaction committing
+ // xid of transaction committing
+ TXNID this_xid = toku_xids_get_innermost_xid(xids);
invariant(this_xid!=TXNID_NONE);
// need to check for provisional entries in ule, otherwise
// there is nothing to abort, not checking this may result
@@ -1668,16 +1867,19 @@ void ule_apply_commit(ULE ule, XIDS xids) {
if (ule->num_puxrs > 0 && ule_get_innermost_xid(ule) == this_xid) {
// 3 cases:
//1- it's already a committed value (do nothing) (num_puxrs==0)
- //2- it's provisional but root level (make a new committed value (num_puxrs==1)
+ //2- it's provisional but root level (make a new committed value
+ // (num_puxrs==1)
//3- it's provisional and not root (promote); (num_puxrs>1)
if (ule->num_puxrs == 1) { //new committed value
ule_promote_provisional_innermost_to_committed(ule);
- }
- else if (ule->num_puxrs > 1) {
- //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-1] is the innermost (this transaction)
+ } else if (ule->num_puxrs > 1) {
+ //ule->uxrs[ule->num_cuxrs+ule->num_puxrs-1] is the innermost
+ // (this transaction)
//ule->uxrs[ule->num_cuxrs+ule->num_puxrs-2] is the 2nd innermost
//We want to promote the innermost uxr one level out.
- ule_promote_provisional_innermost_to_index(ule, ule->num_cuxrs+ule->num_puxrs-2);
+ ule_promote_provisional_innermost_to_index(
+ ule,
+ ule->num_cuxrs+ule->num_puxrs-2);
}
}
}
@@ -1687,14 +1889,17 @@ void ule_apply_commit(ULE ule, XIDS xids) {
//
// Purpose is to record an insert for this transaction (and set type correctly).
-static void
-ule_push_insert_uxr(ULE ule, bool is_committed, TXNID xid, uint32_t vallen, void * valp) {
- UXR uxr = ule_get_first_empty_uxr(ule);
+static inline void ule_push_insert_uxr(
+ ULE ule,
+ bool is_committed, TXNID xid,
+ uint32_t vallen,
+ void* valp) {
+
+ UXR uxr = ule_get_first_empty_uxr(ule);
if (is_committed) {
invariant(ule->num_puxrs==0);
ule->num_cuxrs++;
- }
- else {
+ } else {
ule->num_puxrs++;
}
uxr->xid = xid;
@@ -1706,23 +1911,21 @@ ule_push_insert_uxr(ULE ule, bool is_committed, TXNID xid, uint32_t vallen, void
// Purpose is to record a delete for this transaction. If this transaction
// is the root transaction, then truly delete the leafentry by marking the
// ule as empty.
-static void
-ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid) {
+static inline void ule_push_delete_uxr(ULE ule, bool is_committed, TXNID xid) {
UXR uxr = ule_get_first_empty_uxr(ule);
if (is_committed) {
invariant(ule->num_puxrs==0);
ule->num_cuxrs++;
- }
- else {
+ } else {
ule->num_puxrs++;
}
uxr->xid = xid;
uxr->type = XR_DELETE;
}
-// Purpose is to push a placeholder on the top of the leafentry's transaction stack.
-static void
-ule_push_placeholder_uxr(ULE ule, TXNID xid) {
+// Purpose is to push a placeholder on the top of the leafentry's transaction
+// stack.
+static inline void ule_push_placeholder_uxr(ULE ule, TXNID xid) {
invariant(ule->num_cuxrs>0);
UXR uxr = ule_get_first_empty_uxr(ule);
uxr->xid = xid;
@@ -1731,16 +1934,14 @@ ule_push_placeholder_uxr(ULE ule, TXNID xid) {
}
// Return innermost transaction record.
-static UXR
-ule_get_innermost_uxr(ULE ule) {
+static inline UXR ule_get_innermost_uxr(ULE ule) {
invariant(ule->num_cuxrs > 0);
UXR rval = &(ule->uxrs[ule->num_cuxrs + ule->num_puxrs - 1]);
return rval;
}
// Return first empty transaction record
-static UXR
-ule_get_first_empty_uxr(ULE ule) {
+static inline UXR ule_get_first_empty_uxr(ULE ule) {
invariant(ule->num_puxrs < MAX_TRANSACTION_RECORDS-1);
UXR rval = &(ule->uxrs[ule->num_cuxrs+ule->num_puxrs]);
return rval;
@@ -1748,14 +1949,12 @@ ule_get_first_empty_uxr(ULE ule) {
// Remove the innermost transaction (pop the leafentry's stack), undoing
// whatever the innermost transaction did.
-static void
-ule_remove_innermost_uxr(ULE ule) {
+static inline void ule_remove_innermost_uxr(ULE ule) {
//It is possible to remove the committed delete at first insert.
invariant(ule->num_cuxrs > 0);
if (ule->num_puxrs) {
ule->num_puxrs--;
- }
- else {
+ } else {
//This is for LOADER_USE_PUTS or transactionless environment
//where messages use XIDS of 0
invariant(ule->num_cuxrs == 1);
@@ -1764,14 +1963,12 @@ ule_remove_innermost_uxr(ULE ule) {
}
}
-static TXNID
-ule_get_innermost_xid(ULE ule) {
+static inline TXNID ule_get_innermost_xid(ULE ule) {
TXNID rval = ule_get_xid(ule, ule->num_cuxrs + ule->num_puxrs - 1);
return rval;
}
-static TXNID
-ule_get_xid(ULE ule, uint32_t index) {
+static inline TXNID ule_get_xid(ULE ule, uint32_t index) {
invariant(index < ule->num_cuxrs + ule->num_puxrs);
TXNID rval = ule->uxrs[index].xid;
return rval;
@@ -1781,8 +1978,7 @@ ule_get_xid(ULE ule, uint32_t index) {
// innermost recorded transactions), if necessary. This function is idempotent.
// It makes no logical sense for a placeholder to be the innermost recorded
// transaction record, so placeholders at the top of the stack are not legal.
-static void
-ule_remove_innermost_placeholders(ULE ule) {
+static void ule_remove_innermost_placeholders(ULE ule) {
UXR uxr = ule_get_innermost_uxr(ule);
while (uxr_is_placeholder(uxr)) {
invariant(ule->num_puxrs>0);
@@ -1796,8 +1992,7 @@ ule_remove_innermost_placeholders(ULE ule) {
// Note, after placeholders are added, an insert or delete will be added. This
// function temporarily leaves the transaction stack in an illegal state (having
// placeholders on top).
-static void
-ule_add_placeholders(ULE ule, XIDS xids) {
+static void ule_add_placeholders(ULE ule, XIDS xids) {
//Placeholders can be placed on top of the committed uxr.
invariant(ule->num_cuxrs > 0);
@@ -1819,47 +2014,40 @@ ule_add_placeholders(ULE ule, XIDS xids) {
}
}
-uint64_t
-ule_num_uxrs(ULE ule) {
+uint64_t ule_num_uxrs(ULE ule) {
return ule->num_cuxrs + ule->num_puxrs;
}
-UXR
-ule_get_uxr(ULE ule, uint64_t ith) {
+UXR ule_get_uxr(ULE ule, uint64_t ith) {
invariant(ith < ule_num_uxrs(ule));
return &ule->uxrs[ith];
}
-uint32_t
-ule_get_num_committed(ULE ule) {
+uint32_t ule_get_num_committed(ULE ule) {
return ule->num_cuxrs;
}
-uint32_t
-ule_get_num_provisional(ULE ule) {
+uint32_t ule_get_num_provisional(ULE ule) {
return ule->num_puxrs;
}
-int
-ule_is_committed(ULE ule, uint64_t ith) {
+int ule_is_committed(ULE ule, uint64_t ith) {
invariant(ith < ule_num_uxrs(ule));
return ith < ule->num_cuxrs;
}
-int
-ule_is_provisional(ULE ule, uint64_t ith) {
+int ule_is_provisional(ULE ule, uint64_t ith) {
invariant(ith < ule_num_uxrs(ule));
return ith >= ule->num_cuxrs;
}
// return size of data for innermost uxr, the size of val
-uint32_t
-ule_get_innermost_numbytes(ULE ule, uint32_t keylen) {
+uint32_t ule_get_innermost_numbytes(ULE ule, uint32_t keylen) {
uint32_t rval;
UXR uxr = ule_get_innermost_uxr(ule);
- if (uxr_is_delete(uxr))
+ if (uxr_is_delete(uxr)) {
rval = 0;
- else {
+ } else {
rval = uxr_get_vallen(uxr) + keylen;
}
return rval;
@@ -1870,68 +2058,65 @@ ule_get_innermost_numbytes(ULE ule, uint32_t keylen) {
// This layer of abstraction (uxr_xxx) understands uxr and nothing else.
//
-static inline bool
-uxr_type_is_insert(uint8_t type) {
+static inline bool uxr_type_is_insert(uint8_t type) {
bool rval = (bool)(type == XR_INSERT);
return rval;
}
-bool
-uxr_is_insert(UXR uxr) {
+bool uxr_is_insert(UXR uxr) {
return uxr_type_is_insert(uxr->type);
}
-static inline bool
-uxr_type_is_delete(uint8_t type) {
+static inline bool uxr_type_is_delete(uint8_t type) {
bool rval = (bool)(type == XR_DELETE);
return rval;
}
-bool
-uxr_is_delete(UXR uxr) {
+bool uxr_is_delete(UXR uxr) {
return uxr_type_is_delete(uxr->type);
}
-static inline bool
-uxr_type_is_placeholder(uint8_t type) {
+static inline bool uxr_type_is_placeholder(uint8_t type) {
bool rval = (bool)(type == XR_PLACEHOLDER);
return rval;
}
-bool
-uxr_is_placeholder(UXR uxr) {
+bool uxr_is_placeholder(UXR uxr) {
return uxr_type_is_placeholder(uxr->type);
}
-void *
-uxr_get_val(UXR uxr) {
+void* uxr_get_val(UXR uxr) {
return uxr->valp;
}
-uint32_t
-uxr_get_vallen(UXR uxr) {
+uint32_t uxr_get_vallen(UXR uxr) {
return uxr->vallen;
}
-TXNID
-uxr_get_txnid(UXR uxr) {
+TXNID uxr_get_txnid(UXR uxr) {
return uxr->xid;
}
-static int
-le_iterate_get_accepted_index(TXNID *xids, uint32_t *index, uint32_t num_xids, LE_ITERATE_CALLBACK f, TOKUTXN context, bool top_is_provisional) {
+static int le_iterate_get_accepted_index(
+ TXNID* xids,
+ uint32_t* index,
+ uint32_t num_xids,
+ LE_ITERATE_CALLBACK f,
+ TOKUTXN context,
+ bool top_is_provisional) {
+
uint32_t i;
int r = 0;
- // if this for loop does not return anything, we return num_xids-1, which should map to T_0
+ // if this for loop does not return anything, we return num_xids-1, which
+ // should map to T_0
for (i = 0; i < num_xids - 1; i++) {
TXNID xid = toku_dtoh64(xids[i]);
r = f(xid, context, (i == 0 && top_is_provisional));
if (r==TOKUDB_ACCEPT) {
r = 0;
break; //or goto something
- }
- else if (r!=0) {
+ } else if (r!=0) {
break;
}
}
@@ -1940,8 +2125,7 @@ le_iterate_get_accepted_index(TXNID *xids, uint32_t *index, uint32_t num_xids, L
}
#if ULE_DEBUG
-static void
-ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) {
+static void ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) {
int has_p = (ule->num_puxrs != 0);
invariant(ule->num_cuxrs + has_p == interesting);
uint32_t i;
@@ -1953,21 +2137,29 @@ ule_verify_xids(ULE ule, uint32_t interesting, TXNID *xids) {
#endif
//
-// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted by 'f'. If the value
-// associated with the accepted TXNID is not an insert, then set *is_emptyp to true, otherwise false
+// Iterates over "possible" TXNIDs in a leafentry's stack, until one is
+// accepted by 'f'. If the value associated with the accepted TXNID is not an
+// insert, then set *is_emptyp to true, otherwise false
// The "possible" TXNIDs are:
-// if provisionals exist, then the first possible TXNID is the outermost provisional.
-// The next possible TXNIDs are the committed TXNIDs, from most recently committed to T_0.
-// If provisionals exist, and the outermost provisional is accepted by 'f',
+// If provisionals exist, then the first possible TXNID is the outermost
+// provisional.
+// The next possible TXNIDs are the committed TXNIDs, from most recently
+// committed to T_0.
+// If provisionals exist, and the outermost provisional is accepted by 'f',
// the associated value checked is the innermost provisional's value.
// Parameters:
// le - leafentry to iterate over
-// f - callback function that checks if a TXNID in le is accepted, and its associated value should be examined.
+// f - callback function that checks if a TXNID in le is accepted, and its
+// associated value should be examined.
// is_delp - output parameter that returns answer
// context - parameter for f
//
-static int
-le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN context) {
+static int le_iterate_is_del(
+ LEAFENTRY le,
+ LE_ITERATE_CALLBACK f,
+ bool* is_delp,
+ TOKUTXN context) {
+
#if ULE_DEBUG
ULE_S ule;
le_unpack(&ule, le);
@@ -2002,8 +2194,17 @@ le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN co
#if ULE_DEBUG
ule_verify_xids(&ule, num_interesting, xids);
#endif
- r = le_iterate_get_accepted_index(xids, &index, num_interesting, f, context, (num_puxrs != 0));
- if (r!=0) goto cleanup;
+ r =
+ le_iterate_get_accepted_index(
+ xids,
+ &index,
+ num_interesting,
+ f,
+ context,
+ (num_puxrs != 0));
+ if (r != 0) {
+ goto cleanup;
+ }
invariant(index < num_interesting);
//Skip TXNIDs
@@ -2017,7 +2218,9 @@ le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_delp, TOKUTXN co
#if ULE_DEBUG
{
uint32_t has_p = (ule.num_puxrs != 0);
- uint32_t ule_index = (index==0) ? ule.num_cuxrs + ule.num_puxrs - 1 : ule.num_cuxrs - 1 + has_p - index;
+ uint32_t ule_index = (index==0) ?
+ ule.num_cuxrs + ule.num_puxrs - 1 :
+ ule.num_cuxrs - 1 + has_p - index;
UXR uxr = ule.uxrs + ule_index;
invariant(uxr_is_delete(uxr) == is_del);
}
@@ -2034,7 +2237,11 @@ cleanup:
return r;
}
-static int le_iterate_read_committed_callback(TXNID txnid, TOKUTXN txn, bool is_provisional UU()) {
+static int le_iterate_read_committed_callback(
+ TXNID txnid,
+ TOKUTXN txn,
+ bool is_provisional UU()) {
+
if (is_provisional) {
return toku_txn_reads_txnid(txnid, txn, is_provisional);
}
@@ -2058,33 +2265,40 @@ int le_val_is_del(LEAFENTRY le, enum cursor_read_type read_type, TOKUTXN txn) {
txn
);
rval = is_del;
- }
- else if (read_type == C_READ_ANY) {
+ } else if (read_type == C_READ_ANY) {
rval = le_latest_is_del(le);
- }
- else {
+ } else {
invariant(false);
}
return rval;
}
//
-// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted by 'f'. Set
-// valpp and vallenp to value and length associated with accepted TXNID
+// Iterates over "possible" TXNIDs in a leafentry's stack, until one is accepted
+// by 'f'. Set valpp and vallenp to value and length associated with accepted
+// TXNID
// The "possible" TXNIDs are:
-// if provisionals exist, then the first possible TXNID is the outermost provisional.
-// The next possible TXNIDs are the committed TXNIDs, from most recently committed to T_0.
-// If provisionals exist, and the outermost provisional is accepted by 'f',
+// If provisionals exist, then the first possible TXNID is the outermost
+// provisional.
+// The next possible TXNIDs are the committed TXNIDs, from most recently
+// committed to T_0.
+// If provisionals exist, and the outermost provisional is accepted by 'f',
// the associated length value is the innermost provisional's length and value.
// Parameters:
// le - leafentry to iterate over
-// f - callback function that checks if a TXNID in le is accepted, and its associated value should be examined.
+// f - callback function that checks if a TXNID in le is accepted, and its
+// associated value should be examined.
// valpp - output parameter that returns pointer to value
// vallenp - output parameter that returns length of value
// context - parameter for f
//
-int
-le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context) {
+int le_iterate_val(
+ LEAFENTRY le,
+ LE_ITERATE_CALLBACK f,
+ void** valpp,
+ uint32_t* vallenp,
+ TOKUTXN context) {
+
#if ULE_DEBUG
ULE_S ule;
le_unpack(&ule, le);
@@ -2124,8 +2338,17 @@ le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vall
#if ULE_DEBUG
ule_verify_xids(&ule, num_interesting, xids);
#endif
- r = le_iterate_get_accepted_index(xids, &index, num_interesting, f, context, (num_puxrs != 0));
- if (r!=0) goto cleanup;
+ r =
+ le_iterate_get_accepted_index(
+ xids,
+ &index,
+ num_interesting,
+ f,
+ context,
+ (num_puxrs != 0));
+ if (r != 0) {
+ goto cleanup;
+ }
invariant(index < num_interesting);
//Skip TXNIDs
@@ -2158,7 +2381,9 @@ le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vall
#if ULE_DEBUG
{
uint32_t has_p = (ule.num_puxrs != 0);
- uint32_t ule_index = (index==0) ? ule.num_cuxrs + ule.num_puxrs - 1 : ule.num_cuxrs - 1 + has_p - index;
+ uint32_t ule_index = (index==0) ?
+ ule.num_cuxrs + ule.num_puxrs - 1 :
+ ule.num_cuxrs - 1 + has_p - index;
UXR uxr = ule.uxrs + ule_index;
invariant(uxr_is_insert(uxr));
invariant(uxr->vallen == vallen);
@@ -2188,10 +2413,15 @@ cleanup:
return r;
}
-void le_extract_val(LEAFENTRY le,
- // should we return the entire leafentry as the val?
- bool is_leaf_mode, enum cursor_read_type read_type,
- TOKUTXN ttxn, uint32_t *vallen, void **val) {
+void le_extract_val(
+ LEAFENTRY le,
+ // should we return the entire leafentry as the val?
+ bool is_leaf_mode,
+ enum cursor_read_type read_type,
+ TOKUTXN ttxn,
+ uint32_t* vallen,
+ void** val) {
+
if (is_leaf_mode) {
*val = le;
*vallen = leafentry_memsize(le);
@@ -2199,18 +2429,11 @@ void le_extract_val(LEAFENTRY le,
LE_ITERATE_CALLBACK f = (read_type == C_READ_SNAPSHOT) ?
toku_txn_reads_txnid :
le_iterate_read_committed_callback;
- int r = le_iterate_val(
- le,
- f,
- val,
- vallen,
- ttxn
- );
+ int r = le_iterate_val(le, f, val, vallen, ttxn);
lazy_assert_zero(r);
} else if (read_type == C_READ_ANY){
*val = le_latest_val_and_len(le, vallen);
- }
- else {
+ } else {
assert(false);
}
}
@@ -2244,9 +2467,9 @@ static_assert(18 == sizeof(leafentry_13), "wrong size");
static_assert(9 == __builtin_offsetof(leafentry_13, u), "wrong offset");
//Requires:
-// Leafentry that ule represents should not be destroyed (is not just all deletes)
-static size_t
-le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) {
+// Leafentry that ule represents should not be destroyed (is not just all
+// deletes)
+static size_t le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) {
uint32_t num_uxrs = ule->num_cuxrs + ule->num_puxrs;
assert(num_uxrs);
size_t rval;
@@ -2257,8 +2480,7 @@ le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) {
+4 //vallen
+le->keylen //actual key
+ule->uxrs[0].vallen; //actual val
- }
- else {
+ } else {
rval = 1 //num_uxrs
+4 //keylen
+le->keylen //actual key
@@ -2276,16 +2498,20 @@ le_memsize_from_ule_13 (ULE ule, LEAFENTRY_13 le) {
return rval;
}
-//This function is mostly copied from 4.1.1 (which is version 12, same as 13 except that only 13 is upgradable).
-// Note, number of transaction records in version 13 has been replaced by separate counters in version 14 (MVCC),
-// one counter for committed transaction records and one counter for provisional transaction records. When
-// upgrading a version 13 le to version 14, the number of committed transaction records is always set to one (1)
-// and the number of provisional transaction records is set to the original number of transaction records
-// minus one. The bottom transaction record is assumed to be a committed value. (If there is no committed
-// value then the bottom transaction record of version 13 is a committed delete.)
-// This is the only change from the 4.1.1 code. The rest of the leafentry is read as is.
-static void
-le_unpack_13(ULE ule, LEAFENTRY_13 le) {
+// This function is mostly copied from 4.1.1 (which is version 12, same as 13
+// except that only 13 is upgradable).
+// Note, number of transaction records in version 13 has been replaced by
+// separate counters in version 14 (MVCC), one counter for committed transaction
+// records and one counter for provisional transaction records. When upgrading
+// a version 13 le to version 14, the number of committed transaction records is
+// always set to one (1) and the number of provisional transaction records is
+// set to the original number of transaction records minus one. The bottom
+// transaction record is assumed to be a committed value. (If there is no
+// committed value then the bottom transaction record of version 13 is a
+// committed delete.)
+// This is the only change from the 4.1.1 code. The rest of the leafentry is
+// read as is.
+static void le_unpack_13(ULE ule, LEAFENTRY_13 le) {
//Read num_uxrs
uint8_t num_xrs = le->num_xrs;
assert(num_xrs > 0);
@@ -2302,15 +2528,15 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) {
uint8_t *p;
if (num_xrs == 1) {
//Unpack a 'committed leafentry' (No uncommitted transactions exist)
- ule->uxrs[0].type = XR_INSERT; //Must be or the leafentry would not exist
+ //Must be or the leafentry would not exist
+ ule->uxrs[0].type = XR_INSERT;
ule->uxrs[0].vallen = vallen_of_innermost_insert;
ule->uxrs[0].valp = &le->u.comm.key_val[keylen];
ule->uxrs[0].xid = 0; //Required.
//Set p to immediately after leafentry
p = &le->u.comm.key_val[keylen + vallen_of_innermost_insert];
- }
- else {
+ } else {
//Unpack a 'provisional leafentry' (Uncommitted transactions exist)
//Read in type.
@@ -2337,8 +2563,7 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) {
//Not innermost, so load the type.
uxr->type = *p;
p += 1;
- }
- else {
+ } else {
//Innermost, load the type previously read from header
uxr->type = innermost_type;
}
@@ -2349,12 +2574,11 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) {
//Not committed nor outermost uncommitted, so load the xid.
uxr->xid = toku_dtoh64(*(TXNID*)p);
p += 8;
- }
- else if (i == 1) {
- //Outermost uncommitted, load the xid previously read from header
+ } else if (i == 1) {
+ //Outermost uncommitted, load the xid previously read from
+ //header
uxr->xid = xid_outermost_uncommitted;
- }
- else {
+ } else {
// i == 0, committed entry
uxr->xid = 0;
}
@@ -2367,9 +2591,9 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) {
uxr->valp = p;
p += uxr->vallen;
- }
- else {
- //Innermost insert, load the vallen/valp previously read from header
+ } else {
+ //Innermost insert, load the vallen/valp previously read
+ //from header
uxr->vallen = vallen_of_innermost_insert;
uxr->valp = valp_of_innermost_insert;
found_innermost_insert = true;
@@ -2384,8 +2608,7 @@ le_unpack_13(ULE ule, LEAFENTRY_13 le) {
#endif
}
-size_t
-leafentry_disksize_13(LEAFENTRY_13 le) {
+size_t leafentry_disksize_13(LEAFENTRY_13 le) {
ULE_S ule;
le_unpack_13(&ule, le);
size_t memsize = le_memsize_from_ule_13(&ule, le);
@@ -2393,13 +2616,13 @@ leafentry_disksize_13(LEAFENTRY_13 le) {
return memsize;
}
-int
-toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry,
- void** keyp,
- uint32_t* keylen,
- size_t *new_leafentry_memorysize,
- LEAFENTRY *new_leafentry_p
- ) {
+int toku_le_upgrade_13_14(
+ LEAFENTRY_13 old_leafentry,
+ void** keyp,
+ uint32_t* keylen,
+ size_t* new_leafentry_memorysize,
+ LEAFENTRY* new_leafentry_p) {
+
ULE_S ule;
int rval;
invariant(old_leafentry);
@@ -2408,23 +2631,23 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry,
*keylen = old_leafentry->keylen;
if (old_leafentry->num_xrs == 1) {
*keyp = old_leafentry->u.comm.key_val;
- }
- else {
+ } else {
*keyp = old_leafentry->u.prov.key_val_xrs;
}
// We used to pass NULL for omt and mempool, so that we would use
// malloc instead of a mempool. However after supporting upgrade,
// we need to use mempools and the OMT.
- rval = le_pack(&ule, // create packed leafentry
- nullptr,
- 0, //only matters if we are passing in a bn_data
- nullptr, //only matters if we are passing in a bn_data
- 0, //only matters if we are passing in a bn_data
- 0, //only matters if we are passing in a bn_data
- 0, //only matters if we are passing in a bn_data
- new_leafentry_p,
- nullptr //only matters if we are passing in a bn_data
- );
+ rval =
+ le_pack(
+ &ule, // create packed leafentry
+ nullptr,
+ 0, //only matters if we are passing in a bn_data
+ nullptr, //only matters if we are passing in a bn_data
+ 0, //only matters if we are passing in a bn_data
+ 0, //only matters if we are passing in a bn_data
+ 0, //only matters if we are passing in a bn_data
+ new_leafentry_p,
+ nullptr); //only matters if we are passing in a bn_data
ule_cleanup(&ule);
*new_leafentry_memorysize = leafentry_memsize(*new_leafentry_p);
return rval;
diff --git a/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt b/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt
index 8cea16c914d..6f9146ce5b2 100644
--- a/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt
+++ b/storage/tokudb/PerconaFT/ftcxx/tests/CMakeLists.txt
@@ -2,6 +2,8 @@ include_directories(..)
include_directories(../../src)
include_directories(../../src/tests)
+find_library(JEMALLOC_STATIC_LIBRARY libjemalloc.a)
+
if (BUILD_TESTING)
## reference implementation with simple size-doubling buffer without
## jemalloc size tricks
@@ -24,15 +26,15 @@ if (BUILD_TESTING)
cursor_test
)
set(_testname ${impl}_${test})
- if (with_jemalloc)
+ if (with_jemalloc AND JEMALLOC_STATIC_LIBRARY)
set(_testname ${_testname}_j)
endif ()
add_executable(${_testname} ${test})
- if (with_jemalloc)
+ if (with_jemalloc AND JEMALLOC_STATIC_LIBRARY)
if (APPLE)
- target_link_libraries(${_testname} -Wl,-force_load jemalloc)
+ target_link_libraries(${_testname} -Wl,-force_load ${JEMALLOC_STATIC_LIBRARY})
else ()
- target_link_libraries(${_testname} -Wl,--whole-archive jemalloc -Wl,--no-whole-archive)
+ target_link_libraries(${_testname} -Wl,--whole-archive ${JEMALLOC_STATIC_LIBRARY} -Wl,--no-whole-archive)
endif ()
endif ()
target_link_libraries(${_testname} ${impl})
diff --git a/storage/tokudb/PerconaFT/portability/toku_pthread.h b/storage/tokudb/PerconaFT/portability/toku_pthread.h
index 25cf48dfd8c..84c27736201 100644
--- a/storage/tokudb/PerconaFT/portability/toku_pthread.h
+++ b/storage/tokudb/PerconaFT/portability/toku_pthread.h
@@ -72,15 +72,18 @@ typedef struct toku_mutex_aligned {
toku_mutex_t aligned_mutex __attribute__((__aligned__(64)));
} toku_mutex_aligned_t;
-// Different OSes implement mutexes as different amounts of nested structs.
-// C++ will fill out all missing values with zeroes if you provide at least one zero, but it needs the right amount of nesting.
-#if defined(__FreeBSD__)
-# define ZERO_MUTEX_INITIALIZER {0}
-#elif defined(__APPLE__)
-# define ZERO_MUTEX_INITIALIZER {{0}}
-#else // __linux__, at least
-# define ZERO_MUTEX_INITIALIZER {{{0}}}
-#endif
+// Initializing with {} will fill in a struct with all zeros.
+// But you may also need a pragma to suppress the warnings, as follows
+//
+// #pragma GCC diagnostic push
+// #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
+// toku_mutex_t foo = ZERO_MUTEX_INITIALIZER;
+// #pragma GCC diagnostic pop
+//
+// In general it will be a lot of busy work to make this codebase compile
+// cleanly with -Wmissing-field-initializers
+
+# define ZERO_MUTEX_INITIALIZER {}
#if TOKU_PTHREAD_DEBUG
# define TOKU_MUTEX_INITIALIZER { .pmutex = PTHREAD_MUTEX_INITIALIZER, .owner = 0, .locked = false, .valid = true }
@@ -223,15 +226,9 @@ typedef struct toku_cond {
pthread_cond_t pcond;
} toku_cond_t;
-// Different OSes implement mutexes as different amounts of nested structs.
-// C++ will fill out all missing values with zeroes if you provide at least one zero, but it needs the right amount of nesting.
-#if defined(__FreeBSD__)
-# define ZERO_COND_INITIALIZER {0}
-#elif defined(__APPLE__)
-# define ZERO_COND_INITIALIZER {{0}}
-#else // __linux__, at least
-# define ZERO_COND_INITIALIZER {{{0}}}
-#endif
+// Same considerations as for ZERO_MUTEX_INITIALIZER apply
+#define ZERO_COND_INITIALIZER {}
+
#define TOKU_COND_INITIALIZER {PTHREAD_COND_INITIALIZER}
static inline void
diff --git a/storage/tokudb/PerconaFT/portability/toku_time.h b/storage/tokudb/PerconaFT/portability/toku_time.h
index c476b64a212..11a3f3aa2b9 100644
--- a/storage/tokudb/PerconaFT/portability/toku_time.h
+++ b/storage/tokudb/PerconaFT/portability/toku_time.h
@@ -108,3 +108,13 @@ static inline uint64_t toku_current_time_microsec(void) {
gettimeofday(&t, NULL);
return t.tv_sec * (1UL * 1000 * 1000) + t.tv_usec;
}
+
+// sleep microseconds
+static inline void toku_sleep_microsec(uint64_t ms) {
+ struct timeval t;
+
+ t.tv_sec = ms / 1000000;
+ t.tv_usec = ms % 1000000;
+
+ select(0, NULL, NULL, NULL, &t);
+}
diff --git a/storage/tokudb/PerconaFT/scripts/run.stress-tests.py b/storage/tokudb/PerconaFT/scripts/run.stress-tests.py
index a8df83a3b55..e983fe8ccd9 100644
--- a/storage/tokudb/PerconaFT/scripts/run.stress-tests.py
+++ b/storage/tokudb/PerconaFT/scripts/run.stress-tests.py
@@ -521,14 +521,16 @@ Test output:
}))
def send_mail(toaddrs, subject, body):
- m = MIMEText(body)
- fromaddr = 'tim@tokutek.com'
- m['From'] = fromaddr
- m['To'] = ', '.join(toaddrs)
- m['Subject'] = subject
- s = SMTP('192.168.1.114')
- s.sendmail(fromaddr, toaddrs, str(m))
- s.quit()
+ # m = MIMEText(body)
+ # fromaddr = 'dev-private@percona.com'
+ # m['From'] = fromaddr
+ # m['To'] = ', '.join(toaddrs)
+ # m['Subject'] = subject
+ # s = SMTP('192.168.1.114')
+ # s.sendmail(fromaddr, toaddrs, str(m))
+ # s.quit()
+ info(subject);
+ info(body);
def update(tokudb):
info('Updating from git.')
@@ -554,12 +556,12 @@ def rebuild(tokudb, builddir, tokudb_data, cc, cxx, tests):
env=newenv,
cwd=builddir)
if r != 0:
- send_mail(['leif@tokutek.com'], 'Stress tests on %s failed to build.' % gethostname(), '')
+ send_mail(['dev-private@percona.com'], 'Stress tests on %s failed to build.' % gethostname(), '')
error('Building the tests failed.')
sys.exit(r)
r = call(['make', '-j8'], cwd=builddir)
if r != 0:
- send_mail(['leif@tokutek.com'], 'Stress tests on %s failed to build.' % gethostname(), '')
+ send_mail(['dev-private@percona.com'], 'Stress tests on %s failed to build.' % gethostname(), '')
error('Building the tests failed.')
sys.exit(r)
@@ -671,7 +673,7 @@ def main(opts):
sys.exit(0)
except Exception, e:
exception('Unhandled exception caught in main.')
- send_mail(['leif@tokutek.com'], 'Stress tests caught unhandled exception in main, on %s' % gethostname(), format_exc())
+ send_mail(['dev-private@percona.com'], 'Stress tests caught unhandled exception in main, on %s' % gethostname(), format_exc())
raise e
if __name__ == '__main__':
@@ -786,7 +788,7 @@ if __name__ == '__main__':
if not opts.send_emails:
opts.email = None
elif len(opts.email) == 0:
- opts.email.append('tokueng@tokutek.com')
+ opts.email.append('dev-private@percona.com')
if opts.debug:
logging.basicConfig(level=logging.DEBUG)
diff --git a/storage/tokudb/PerconaFT/src/export.map b/storage/tokudb/PerconaFT/src/export.map
index 3f2c7569ea4..fc2be5f41a5 100644
--- a/storage/tokudb/PerconaFT/src/export.map
+++ b/storage/tokudb/PerconaFT/src/export.map
@@ -82,6 +82,7 @@
toku_test_db_redirect_dictionary;
toku_test_get_latest_lsn;
toku_test_get_checkpointing_user_data_status;
+ toku_set_test_txn_sync_callback;
toku_indexer_set_test_only_flags;
toku_increase_last_xid;
diff --git a/storage/tokudb/PerconaFT/src/indexer-undo-do.cc b/storage/tokudb/PerconaFT/src/indexer-undo-do.cc
index b93429407eb..8d0b080b9fe 100644
--- a/storage/tokudb/PerconaFT/src/indexer-undo-do.cc
+++ b/storage/tokudb/PerconaFT/src/indexer-undo-do.cc
@@ -313,7 +313,7 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info
break;
if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) {
- // if the outermost is not live, then the inner state must be retired. thats the way that the txn API works.
+ // If the outermost is not live, then the inner state must be retired. That's the way that the txn API works.
assert(this_xid_state == TOKUTXN_RETIRED);
}
diff --git a/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt b/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt
index 70977a9dfda..47f6aa44a75 100644
--- a/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt
+++ b/storage/tokudb/PerconaFT/src/tests/CMakeLists.txt
@@ -53,7 +53,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
target_link_libraries(test-5138.tdb ${LIBTOKUDB}_static z ${LIBTOKUPORTABILITY}_static ${CMAKE_THREAD_LIBS_INIT} ${EXTRA_SYSTEM_LIBS})
add_space_separated_property(TARGET test-5138.tdb COMPILE_FLAGS -fvisibility=hidden)
add_ydb_test(test-5138.tdb)
-
+ add_ydb_test(rollback-inconsistency.tdb)
foreach(bin ${tdb_bins})
get_filename_component(base ${bin} NAME_WE)
diff --git a/storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc b/storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc
new file mode 100644
index 00000000000..f8099c7a639
--- /dev/null
+++ b/storage/tokudb/PerconaFT/src/tests/rollback-inconsistency.cc
@@ -0,0 +1,161 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "test.h"
+
+// insert enough rows with a child txn and then force an eviction to verify the rollback
+// log node is in valid state.
+// the test fails without the fix (and of course passes with it).
+// The test basically simulates the test script of George's.
+
+
+static void
+populate_table(int start, int end, DB_TXN * parent, DB_ENV * env, DB * db) {
+ DB_TXN *txn = NULL;
+ int r = env->txn_begin(env, parent, &txn, 0); assert_zero(r);
+ for (int i = start; i < end; i++) {
+ int k = htonl(i);
+ char kk[4];
+ char str[220];
+ memset(kk, 0, sizeof kk);
+ memcpy(kk, &k, sizeof k);
+ memset(str,'a', sizeof str);
+ DBT key = { .data = kk, .size = sizeof kk };
+ DBT val = { .data = str, .size = sizeof str };
+ r = db->put(db, txn, &key, &val, 0);
+ assert_zero(r);
+ }
+ r = txn->commit(txn, 0);
+ assert_zero(r);
+}
+
+static void
+populate_and_test(DB_ENV *env, DB *db) {
+ int r;
+ DB_TXN *parent = NULL;
+ r = env->txn_begin(env, NULL, &parent, 0); assert_zero(r);
+
+ populate_table(0, 128, parent, env, db);
+
+ //we know the eviction is going to happen here and the log node of parent txn is going to be evicted
+ //due to the extremely low cachesize.
+ populate_table(128, 256, parent, env, db);
+
+ //again eviction due to the memory pressure. 256 rows is the point when that rollback log spills out. The spilled node
+ //will be written back but will not be dirtied by including rollback nodes from child txn(in which case the bug is bypassed).
+ populate_table(256, 512, parent, env, db);
+
+ r = parent->abort(parent); assert_zero(r);
+
+ //try to search anything in the lost range
+ int k = htonl(200);
+ char kk[4];
+ memset(kk, 0, sizeof kk);
+ memcpy(kk, &k, sizeof k);
+ DBT key = { .data = kk, .size = sizeof kk };
+ DBT val;
+ r = db->get(db, NULL, &key, &val, 0);
+ assert(r==DB_NOTFOUND);
+
+}
+
+static void
+run_test(void) {
+ int r;
+ DB_ENV *env = NULL;
+ r = db_env_create(&env, 0);
+ assert_zero(r);
+ env->set_errfile(env, stderr);
+
+ //setting up the cachetable size 64k
+ uint32_t cachesize = 64*1024;
+ r = env->set_cachesize(env, 0, cachesize, 1);
+ assert_zero(r);
+
+ //setting up the log write block size to 4k so the rollback log nodes spill in accordance with the node size
+ r = env->set_lg_bsize(env, 4096);
+ assert_zero(r);
+
+ r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO);
+ assert_zero(r);
+
+ DB *db = NULL;
+ r = db_create(&db, env, 0);
+ assert_zero(r);
+
+ r = db->set_pagesize(db, 4096);
+ assert_zero(r);
+
+ r = db->set_readpagesize(db, 1024);
+ assert_zero(r);
+
+ r = db->open(db, NULL, "test.tdb", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
+ assert_zero(r);
+
+ populate_and_test(env, db);
+
+ r = db->close(db, 0); assert_zero(r);
+
+ r = env->close(env, 0); assert_zero(r);
+}
+
+int
+test_main(int argc, char * const argv[]) {
+ int r;
+
+ // parse_args(argc, argv);
+ for (int i = 1; i < argc; i++) {
+ char * const arg = argv[i];
+ if (strcmp(arg, "-v") == 0) {
+ verbose++;
+ continue;
+ }
+ if (strcmp(arg, "-q") == 0) {
+ verbose = 0;
+ continue;
+ }
+ }
+
+ toku_os_recursive_delete(TOKU_TEST_FILENAME);
+ r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); assert_zero(r);
+
+ run_test();
+
+ return 0;
+}
+
diff --git a/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc b/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc
index 0c70b0669ad..a2b48e443cd 100644
--- a/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc
+++ b/storage/tokudb/PerconaFT/src/tests/stat64-root-changes.cc
@@ -166,7 +166,7 @@ run_test (void) {
DB_BTREE_STAT64 s;
r = db->stat64(db, NULL, &s); CKERR(r);
- assert(s.bt_nkeys == 1 && s.bt_dsize == sizeof key + sizeof val);
+ assert(s.bt_nkeys == 0);
r = db->close(db, 0); CKERR(r);
@@ -176,7 +176,7 @@ run_test (void) {
r = txn->commit(txn, 0); CKERR(r);
r = db->stat64(db, NULL, &s); CKERR(r);
- assert(s.bt_nkeys == 1 && s.bt_dsize == sizeof key + sizeof val);
+ assert(s.bt_nkeys == 0);
}
// verify update callback overwrites the row
diff --git a/storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc b/storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc
new file mode 100644
index 00000000000..c440bdc59e7
--- /dev/null
+++ b/storage/tokudb/PerconaFT/src/tests/test_db_rowcount.cc
@@ -0,0 +1,523 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+#include "test.h"
+#include <stdio.h>
+
+#include <sys/stat.h>
+#include <db.h>
+
+// Tests that the logical row counts are correct and not subject to variance
+// due to normal insert/delete messages within the tree with the few exceptions
+// of 1) rollback messages not yet applied; 2) inserts messages turned to
+// updates on apply; and 3) missing leafentries on delete messages on apply.
+
+static DB_TXN* const null_txn = 0;
+static const uint64_t num_records = 4*1024;
+
+#define CHECK_NUM_ROWS(_expected, _stats) assert(_stats.bt_ndata == _expected)
+
+static DB* create_db(const char* fname, DB_ENV* env) {
+ int r;
+ DB* db;
+
+ r = db_create(&db, env, 0);
+ assert(r == 0);
+ db->set_errfile(db, stderr);
+
+ r = db->set_pagesize(db, 8192);
+ assert(r == 0);
+
+ r = db->set_readpagesize(db, 1024);
+ assert(r == 0);
+
+ r = db->set_fanout(db, 4);
+ assert(r == 0);
+
+ r = db->set_compression_method(db, TOKU_NO_COMPRESSION);
+ assert(r == 0);
+
+ r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE,
+ 0666);
+ assert(r == 0);
+
+ return db;
+}
+static void add_records(DB* db, DB_TXN* txn, uint64_t start_id, uint64_t num) {
+ int r;
+ for (uint64_t i = 0, j=start_id; i < num; i++,j++) {
+ char key[100], val[256];
+ DBT k,v;
+ snprintf(key, 100, "%08" PRIu64, j);
+ snprintf(val, 256, "%*s", 200, key);
+ r =
+ db->put(
+ db,
+ txn,
+ dbt_init(&k, key, 1+strlen(key)),
+ dbt_init(&v, val, 1+strlen(val)),
+ 0);
+ assert(r == 0);
+ }
+}
+static void delete_records(
+ DB* db,
+ DB_TXN* txn,
+ uint64_t start_id,
+ uint64_t num) {
+
+ int r;
+ for (uint64_t i = 0, j=start_id; i < num; i++,j++) {
+ char key[100];
+ DBT k;
+ snprintf(key, 100, "%08" PRIu64, j);
+ r =
+ db->del(
+ db,
+ txn,
+ dbt_init(&k, key, 1+strlen(key)),
+ 0);
+ assert(r == 0);
+ }
+}
+static void full_optimize(DB* db) {
+ int r;
+ uint64_t loops_run = 0;
+
+ r = db->optimize(db);
+ assert(r == 0);
+
+ r = db->hot_optimize(db, NULL, NULL, NULL, NULL, &loops_run);
+ assert(r == 0);
+}
+static void test_insert_commit(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf("%s : before commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ db->close(db, 0);
+}
+static void test_insert_delete_commit(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf("%s : before delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ delete_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ db->close(db, 0);
+}
+static void test_insert_commit_delete_commit(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf(
+ "%s : before insert commit %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf(
+ "%s : after insert commit %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ delete_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf(
+ "%s : after delete commit %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ db->close(db, 0);
+}
+static void test_insert_rollback(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf("%s : before rollback %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ r = txn->abort(txn);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ // CAN NOT TEST stats HERE AS THEY ARE SOMEWHAT NON_DETERMINISTIC UNTIL
+ // optimize + hot_optimize HAVE BEEN RUN DUE TO THE FACT THAT ROLLBACK
+ // MESSAGES ARE "IN-FLIGHT" IN THE TREE AND MUST BE APPLIED IN ORDER TO
+ // CORRECT THE RUNNING LOGICAL COUNT
+ if (verbose)
+ printf("%s : after rollback %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ full_optimize(db);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf(
+ "%s : after rollback optimize %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ db->close(db, 0);
+}
+static void test_insert_delete_rollback(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf("%s : before delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ delete_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ r = txn->abort(txn);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ db->close(db, 0);
+}
+static void test_insert_commit_delete_rollback(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf(
+ "%s : before insert commit %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf(
+ "%s : after insert commit %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ delete_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(0, stats);
+ if (verbose)
+ printf("%s : after delete %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ r = txn->abort(txn);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ // CAN NOT TEST stats HERE AS THEY ARE SOMEWHAT NON_DETERMINISTIC UNTIL
+ // optimize + hot_optimize HAVE BEEN RUN DUE TO THE FACT THAT ROLLBACK
+ // MESSAGES ARE "IN-FLIGHT" IN THE TREE AND MUST BE APPLIED IN ORDER TO
+ // CORRECT THE RUNNING LOGICAL COUNT
+ if (verbose)
+ printf(
+ "%s : after delete rollback %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ full_optimize(db);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf(
+ "%s : after delete rollback optimize %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ db->close(db, 0);
+}
+
+static int test_recount_insert_commit_progress(
+ uint64_t count,
+ uint64_t deleted,
+ void*) {
+
+ if (verbose)
+ printf(
+ "%s : count[%" PRIu64 "] deleted[%" PRIu64 "]\n",
+ __FUNCTION__,
+ count,
+ deleted);
+ return 0;
+}
+static int test_recount_cancel_progress(uint64_t, uint64_t, void*) {
+ return 1;
+}
+
+static void test_recount_insert_commit(DB_ENV* env) {
+ int r;
+ DB* db;
+ DB_TXN* txn;
+ DB_BTREE_STAT64 stats;
+
+ db = create_db(__FUNCTION__, env);
+
+ r = env->txn_begin(env, null_txn, &txn, 0);
+ assert(r == 0);
+
+ add_records(db, txn, 0, num_records);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf(
+ "%s : before commit %" PRIu64 " rows\n",
+ __FUNCTION__,
+ stats.bt_ndata);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+
+ r = db->stat64(db, null_txn, &stats);
+ assert(r == 0);
+
+ CHECK_NUM_ROWS(num_records, stats);
+ if (verbose)
+ printf("%s : after commit %" PRIu64 " rows\n", __FUNCTION__, stats.bt_ndata);
+
+ // test that recount counted correct # of rows
+ r = db->recount_rows(db, test_recount_insert_commit_progress, NULL);
+ assert(r == 0);
+ CHECK_NUM_ROWS(num_records, stats);
+
+ // test that recount callback cancel returns
+ r = db->recount_rows(db, test_recount_cancel_progress, NULL);
+ assert(r == 1);
+ CHECK_NUM_ROWS(num_records, stats);
+
+ db->close(db, 0);
+}
+int test_main(int UU(argc), char UU(*const argv[])) {
+ int r;
+ DB_ENV* env;
+
+ toku_os_recursive_delete(TOKU_TEST_FILENAME);
+ toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU + S_IRWXG + S_IRWXO);
+
+ r = db_env_create(&env, 0);
+ assert(r == 0);
+
+ r =
+ env->open(
+ env,
+ TOKU_TEST_FILENAME,
+ DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_TXN + DB_PRIVATE + DB_CREATE,
+ S_IRWXU + S_IRWXG + S_IRWXO);
+ assert(r == 0);
+
+ test_insert_commit(env);
+ test_insert_delete_commit(env);
+ test_insert_commit_delete_commit(env);
+ test_insert_rollback(env);
+ test_insert_delete_rollback(env);
+ test_insert_commit_delete_rollback(env);
+ test_recount_insert_commit(env);
+
+ r = env->close(env, 0);
+ assert(r == 0);
+
+ return 0;
+}
diff --git a/storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc b/storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc
new file mode 100644
index 00000000000..30cc16d73a7
--- /dev/null
+++ b/storage/tokudb/PerconaFT/src/tests/txn_manager_handle_snapshot_atomicity.cc
@@ -0,0 +1,217 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+//In response to the read-commit crash bug in the sysbench, this test is created to test
+//the atomicity of the txn manager when handling the child txn snapshot.
+//The test is supposed to fail before the read-commit-fix.
+
+#include "test.h"
+#include "toku_pthread.h"
+#include "ydb.h"
+struct test_sync {
+ int state;
+ toku_mutex_t lock;
+ toku_cond_t cv;
+};
+
+static void test_sync_init(struct test_sync *UU(sync)) {
+#if TOKU_DEBUG_TXN_SYNC
+ sync->state = 0;
+ toku_mutex_init(&sync->lock, NULL);
+ toku_cond_init(&sync->cv, NULL);
+#endif
+}
+
+static void test_sync_destroy(struct test_sync *UU(sync)) {
+#if TOKU_DEBUG_TXN_SYNC
+ toku_mutex_destroy(&sync->lock);
+ toku_cond_destroy(&sync->cv);
+#endif
+}
+
+static void test_sync_sleep(struct test_sync *UU(sync), int UU(new_state)) {
+#if TOKU_DEBUG_TXN_SYNC
+ toku_mutex_lock(&sync->lock);
+ while (sync->state != new_state) {
+ toku_cond_wait(&sync->cv, &sync->lock);
+ }
+ toku_mutex_unlock(&sync->lock);
+#endif
+}
+
+static void test_sync_next_state(struct test_sync *UU(sync)) {
+#if TOKU_DEBUG_TXN_SYNC
+ toku_mutex_lock(&sync->lock);
+ sync->state++;
+ toku_cond_broadcast(&sync->cv);
+ toku_mutex_unlock(&sync->lock);
+#endif
+}
+
+
+struct start_txn_arg {
+ DB_ENV *env;
+ DB *db;
+ DB_TXN * parent;
+};
+
+static struct test_sync sync_s;
+
+static void test_callback(pthread_t self_tid, void * extra) {
+ pthread_t **p = (pthread_t **) extra;
+ pthread_t tid_1 = *p[0];
+ pthread_t tid_2 = *p[1];
+ assert(pthread_equal(self_tid, tid_2));
+ printf("%s: the thread[%" PRIu64 "] is going to wait...\n", __func__, reinterpret_cast<uint64_t>(tid_1));
+ test_sync_next_state(&sync_s);
+ sleep(3);
+ //test_sync_sleep(&sync_s,3);
+ //using test_sync_sleep/test_sync_next_state pair can sync threads better, however
+ //after the fix, this might cause a deadlock. just simply use sleep to do a proof-
+ //of-concept test.
+ printf("%s: the thread[%" PRIu64 "] is resuming...\n", __func__, reinterpret_cast<uint64_t>(tid_1));
+ return;
+}
+
+static void * start_txn2(void * extra) {
+ struct start_txn_arg * args = (struct start_txn_arg *) extra;
+ DB_ENV * env = args -> env;
+ DB * db = args->db;
+ DB_TXN * parent = args->parent;
+ test_sync_sleep(&sync_s, 1);
+ printf("start %s [thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self()));
+ DB_TXN *txn;
+ int r = env->txn_begin(env, parent, &txn, DB_READ_COMMITTED);
+ assert(r == 0);
+ //do some random things...
+ DBT key, data;
+ dbt_init(&key, "hello", 6);
+ dbt_init(&data, "world", 6);
+ db->put(db, txn, &key, &data, 0);
+ db->get(db, txn, &key, &data, 0);
+
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+ printf("%s done[thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self()));
+ return extra;
+}
+
+static void * start_txn1(void * extra) {
+ struct start_txn_arg * args = (struct start_txn_arg *) extra;
+ DB_ENV * env = args -> env;
+ DB * db = args->db;
+ printf("start %s: [thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self()));
+ DB_TXN *txn;
+ int r = env->txn_begin(env, NULL, &txn, DB_READ_COMMITTED);
+ assert(r == 0);
+ printf("%s: txn began by [thread %" PRIu64 "], will wait\n", __func__, reinterpret_cast<uint64_t>(pthread_self()));
+ test_sync_next_state(&sync_s);
+ test_sync_sleep(&sync_s,2);
+ printf("%s: [thread %" PRIu64 "] resumed\n", __func__, reinterpret_cast<uint64_t>(pthread_self()));
+ //do some random things...
+ DBT key, data;
+ dbt_init(&key, "hello", 6);
+ dbt_init(&data, "world", 6);
+ db->put(db, txn, &key, &data, 0);
+ db->get(db, txn, &key, &data, 0);
+ r = txn->commit(txn, 0);
+ assert(r == 0);
+ printf("%s: done[thread %" PRIu64 "]\n", __func__, reinterpret_cast<uint64_t>(pthread_self()));
+ //test_sync_next_state(&sync_s);
+ return extra;
+}
+
+int test_main (int UU(argc), char * const UU(argv[])) {
+ int r;
+ toku_os_recursive_delete(TOKU_TEST_FILENAME);
+ r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);
+ assert(r == 0);
+
+ DB_ENV *env;
+ r = db_env_create(&env, 0);
+ assert(r == 0);
+
+ r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO);
+ assert(r == 0);
+
+ DB *db = NULL;
+ r = db_create(&db, env, 0);
+ assert(r == 0);
+
+ r = db->open(db, NULL, "testit", NULL, DB_BTREE, DB_AUTO_COMMIT+DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
+ assert(r == 0);
+
+ DB_TXN * parent = NULL;
+ r = env->txn_begin(env, 0, &parent, DB_READ_COMMITTED);
+ assert(r == 0);
+
+ ZERO_STRUCT(sync_s);
+ test_sync_init(&sync_s);
+
+ pthread_t tid_1 = 0;
+ pthread_t tid_2 = 0;
+ pthread_t* callback_extra[2] = {&tid_1, &tid_2};
+ toku_set_test_txn_sync_callback(test_callback, callback_extra);
+
+ struct start_txn_arg args = {env, db, parent};
+
+ r = pthread_create(&tid_1, NULL, start_txn1, &args);
+ assert(r==0);
+
+ r= pthread_create(&tid_2, NULL, start_txn2, &args);
+ assert(r==0);
+
+ void * ret;
+ r = pthread_join(tid_1, &ret);
+ assert(r == 0);
+ r = pthread_join(tid_2, &ret);
+ assert(r == 0);
+
+ r = parent->commit(parent, 0);
+ assert(r ==0);
+
+ test_sync_destroy(&sync_s);
+ r = db->close(db, 0);
+ assert(r == 0);
+
+ r = env->close(env, 0);
+ assert(r == 0);
+
+ return 0;
+}
+
diff --git a/storage/tokudb/PerconaFT/src/ydb.cc b/storage/tokudb/PerconaFT/src/ydb.cc
index 88c6c86f214..55da418a0de 100644
--- a/storage/tokudb/PerconaFT/src/ydb.cc
+++ b/storage/tokudb/PerconaFT/src/ydb.cc
@@ -3148,6 +3148,10 @@ toku_test_get_latest_lsn(DB_ENV *env) {
return rval.lsn;
}
+void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) {
+ set_test_txn_sync_callback(cb, extra);
+}
+
int
toku_test_get_checkpointing_user_data_status (void) {
return toku_cachetable_get_checkpointing_user_data_status();
diff --git a/storage/tokudb/PerconaFT/src/ydb.h b/storage/tokudb/PerconaFT/src/ydb.h
index 9d4e94c6f30..facbfdc9252 100644
--- a/storage/tokudb/PerconaFT/src/ydb.h
+++ b/storage/tokudb/PerconaFT/src/ydb.h
@@ -58,3 +58,6 @@ extern "C" uint64_t toku_test_get_latest_lsn(DB_ENV *env) __attribute__((__visib
// test-only function
extern "C" int toku_test_get_checkpointing_user_data_status(void) __attribute__((__visibility__("default")));
+
+// test-only function
+extern "C" void toku_set_test_txn_sync_callback(void (* ) (pthread_t, void *), void * extra) __attribute__((__visibility__("default")));
diff --git a/storage/tokudb/PerconaFT/src/ydb_db.cc b/storage/tokudb/PerconaFT/src/ydb_db.cc
index 25b24467684..e5bd4e7d089 100644
--- a/storage/tokudb/PerconaFT/src/ydb_db.cc
+++ b/storage/tokudb/PerconaFT/src/ydb_db.cc
@@ -1015,6 +1015,25 @@ toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float
return r;
}
+
+static int
+toku_db_recount_rows(DB* db, int (*progress_callback)(uint64_t count,
+ uint64_t deleted,
+ void* progress_extra),
+ void* progress_extra) {
+
+ HANDLE_PANICKED_DB(db);
+ int r = 0;
+ r =
+ toku_ft_recount_rows(
+ db->i->ft_handle,
+ progress_callback,
+ progress_extra);
+
+ return r;
+}
+
+
int toku_setup_db_internal (DB **dbp, DB_ENV *env, uint32_t flags, FT_HANDLE ft_handle, bool is_open) {
if (flags || env == NULL)
return EINVAL;
@@ -1098,6 +1117,7 @@ toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) {
USDB(dbt_pos_infty);
USDB(dbt_neg_infty);
USDB(get_fragmentation);
+ USDB(recount_rows);
#undef USDB
result->get_indexer = db_get_indexer;
result->del = autotxn_db_del;