summaryrefslogtreecommitdiff
path: root/ft
diff options
context:
space:
mode:
authorLeif Walsh <leif@tokutek.com>2012-11-07 21:35:11 +0000
committerYoni Fogel <yoni@tokutek.com>2013-04-17 00:01:14 -0400
commit10f6b5c79fa7d3f7c49617fcf904efba1513e213 (patch)
tree1194fa915c5aea8a7a299f24cf91e7afb229389e /ft
parent3571ca4bb94a7fbdff9688f24b5a8be1e7369500 (diff)
downloadmariadb-git-10f6b5c79fa7d3f7c49617fcf904efba1513e213.tar.gz
refs #5418 merge promotion to main
git-svn-id: file:///svn/toku/tokudb@49697 c7de825b-a66e-492c-adef-691d508d4ae1
Diffstat (limited to 'ft')
-rw-r--r--ft/block_table.cc136
-rw-r--r--ft/cachetable-internal.h3
-rw-r--r--ft/cachetable.cc151
-rw-r--r--ft/cachetable.h12
-rw-r--r--ft/checkpoint.cc9
-rw-r--r--ft/fifo.cc14
-rw-r--r--ft/fifo.h30
-rw-r--r--ft/ft-cachetable-wrappers.cc13
-rw-r--r--ft/ft-cachetable-wrappers.h4
-rw-r--r--ft/ft-flusher.cc332
-rw-r--r--ft/ft-flusher.h1
-rw-r--r--ft/ft-hot-flusher.cc7
-rw-r--r--ft/ft-internal.h123
-rw-r--r--ft/ft-ops.cc949
-rw-r--r--ft/ft-serialize.cc28
-rw-r--r--ft/ft-test-helpers.cc7
-rw-r--r--ft/ft-verify.cc25
-rw-r--r--ft/ft.cc10
-rw-r--r--ft/ft_layout_version.h1
-rw-r--r--ft/ft_node-serialize.cc109
-rw-r--r--ft/fttypes.h6
-rw-r--r--ft/leafentry.h10
-rw-r--r--ft/locking-benchmarks/mfence-benchmark.cc5
-rw-r--r--ft/locking-benchmarks/pthread-locks.cc5
-rw-r--r--ft/locking-benchmarks/trylock-rdtsc.cc7
-rw-r--r--ft/rollback-ct-callbacks.h1
-rw-r--r--ft/rollback.cc2
-rw-r--r--ft/tests/cachetable-checkpoint-pending.cc7
-rw-r--r--ft/tests/cachetable-checkpoint-test.cc2
-rw-r--r--ft/tests/cachetable-checkpointer-class.cc5
-rw-r--r--ft/tests/cachetable-count-pinned-test.cc3
-rw-r--r--ft/tests/cachetable-flush-test.cc8
-rw-r--r--ft/tests/cachetable-prefetch-checkpoint-test.cc2
-rw-r--r--ft/tests/cachetable-prefetch-maybegetandpin-test.cc2
-rw-r--r--ft/tests/cachetable-prefetch2-test.cc2
-rw-r--r--ft/tests/cachetable-put-test.cc2
-rw-r--r--ft/tests/cachetable-simple-maybe-get-pin.cc18
-rw-r--r--ft/tests/cachetable-test.cc2
-rw-r--r--ft/tests/cachetable-unpin-and-remove-test.cc2
-rw-r--r--ft/tests/cachetable-unpin-test.cc2
-rw-r--r--ft/tests/cachetable-writer-thread-limit.cc5
-rw-r--r--ft/tests/ftloader-error-injector.h9
-rw-r--r--ft/tests/keyrange.cc4
-rw-r--r--ft/tests/make-tree.cc2
-rw-r--r--ft/tests/msnfilter.cc16
-rw-r--r--ft/tests/orthopush-flush.cc8
-rw-r--r--ft/tests/test.h3
-rw-r--r--ft/tests/test3884.cc12
-rw-r--r--ft/ule-internal.h6
-rw-r--r--ft/ule.cc48
-rw-r--r--ft/ule.h14
-rw-r--r--ft/worker-thread-benchmarks/threadpool.cc5
52 files changed, 1412 insertions, 777 deletions
diff --git a/ft/block_table.cc b/ft/block_table.cc
index f9583902357..377283ab958 100644
--- a/ft/block_table.cc
+++ b/ft/block_table.cc
@@ -89,9 +89,9 @@ static inline void unlock_for_blocktable (BLOCK_TABLE bt);
static void
ft_set_dirty(FT ft, bool for_checkpoint){
toku_mutex_assert_locked(&ft->blocktable->mutex);
- assert(ft->h->type == FT_CURRENT);
+ paranoid_invariant(ft->h->type == FT_CURRENT);
if (for_checkpoint) {
- assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
+ paranoid_invariant(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
ft->checkpoint_header->dirty = 1;
}
else {
@@ -134,10 +134,10 @@ toku_maybe_truncate_file_on_open(BLOCK_TABLE bt, int fd) {
static void
copy_translation(struct translation * dst, struct translation * src, enum translation_type newtype) {
- assert(src->length_of_array >= src->smallest_never_used_blocknum.b); //verify invariant
- assert(newtype==TRANSLATION_DEBUG ||
- (src->type == TRANSLATION_CURRENT && newtype == TRANSLATION_INPROGRESS) ||
- (src->type == TRANSLATION_CHECKPOINTED && newtype == TRANSLATION_CURRENT));
+ paranoid_invariant(src->length_of_array >= src->smallest_never_used_blocknum.b); //verify invariant
+ paranoid_invariant(newtype==TRANSLATION_DEBUG ||
+ (src->type == TRANSLATION_CURRENT && newtype == TRANSLATION_INPROGRESS) ||
+ (src->type == TRANSLATION_CHECKPOINTED && newtype == TRANSLATION_CURRENT));
dst->type = newtype;
dst->smallest_never_used_blocknum = src->smallest_never_used_blocknum;
dst->blocknum_freelist_head = src->blocknum_freelist_head;
@@ -175,7 +175,7 @@ maybe_optimize_translation(struct translation *t) {
//This is O(n) work, so do it only if you're already doing that.
BLOCKNUM b;
- assert(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS);
+ paranoid_invariant(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS);
//Calculate how large the free suffix is.
int64_t freed;
{
@@ -212,7 +212,7 @@ void
toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) {
toku_mutex_assert_locked(&bt->mutex);
// Copy current translation to inprogress translation.
- assert(bt->inprogress.block_translation == NULL);
+ paranoid_invariant(bt->inprogress.block_translation == NULL);
//We're going to do O(n) work to copy the translation, so we
//can afford to do O(n) work by optimizing the translation
maybe_optimize_translation(&bt->current);
@@ -229,7 +229,7 @@ toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) {
void toku_block_translation_note_skipped_checkpoint (BLOCK_TABLE bt) {
//Purpose, alert block translation that the checkpoint was skipped, e.x. for a non-dirty header
lock_for_blocktable(bt);
- assert(bt->inprogress.block_translation);
+ paranoid_invariant_notnull(bt->inprogress.block_translation);
bt->checkpoint_skipped = true;
unlock_for_blocktable(bt);
}
@@ -267,7 +267,7 @@ toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, int fd) {
// Free unused blocks
lock_for_blocktable(bt);
uint64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator);
- assert(bt->inprogress.block_translation);
+ paranoid_invariant_notnull(bt->inprogress.block_translation);
if (bt->checkpoint_skipped || bt->checkpoint_failed) {
cleanup_failed_checkpoint(bt);
goto end;
@@ -299,25 +299,31 @@ end:
unlock_for_blocktable(bt);
}
-
+__attribute__((nonnull,const))
+static inline bool
+is_valid_blocknum(struct translation *t, BLOCKNUM b) {
+ //Sanity check: Verify invariant
+ paranoid_invariant(t->length_of_array >= t->smallest_never_used_blocknum.b);
+ return b.b >= 0 && b.b < t->smallest_never_used_blocknum.b;
+}
static inline void
-verify_valid_blocknum (struct translation *t, BLOCKNUM b) {
- assert(b.b >= 0);
- assert(b.b < t->smallest_never_used_blocknum.b);
+verify_valid_blocknum (struct translation *UU(t), BLOCKNUM UU(b)) {
+ paranoid_invariant(is_valid_blocknum(t, b));
+}
+__attribute__((nonnull,const))
+static inline bool
+is_valid_freeable_blocknum(struct translation *t, BLOCKNUM b) {
//Sanity check: Verify invariant
- assert(t->length_of_array >= t->smallest_never_used_blocknum.b);
+ paranoid_invariant(t->length_of_array >= t->smallest_never_used_blocknum.b);
+ return b.b >= RESERVED_BLOCKNUMS && b.b < t->smallest_never_used_blocknum.b;
}
//Can be freed
static inline void
-verify_valid_freeable_blocknum (struct translation *t, BLOCKNUM b) {
- assert(b.b >= RESERVED_BLOCKNUMS);
- assert(b.b < t->smallest_never_used_blocknum.b);
-
- //Sanity check: Verify invariant
- assert(t->length_of_array >= t->smallest_never_used_blocknum.b);
+verify_valid_freeable_blocknum (struct translation *UU(t), BLOCKNUM UU(b)) {
+ paranoid_invariant(is_valid_freeable_blocknum(t, b));
}
static void
@@ -376,11 +382,9 @@ calculate_size_on_disk (struct translation *t) {
// We cannot free the disk space allocated to this blocknum if it is still in use by the given translation table.
static inline bool
translation_prevents_freeing(struct translation *t, BLOCKNUM b, struct block_translation_pair *old_pair) {
- bool r = (bool)
- (t->block_translation &&
+ return (t->block_translation &&
b.b < t->smallest_never_used_blocknum.b &&
old_pair->u.diskoff == t->block_translation[b.b].u.diskoff);
- return r;
}
static void
@@ -413,7 +417,7 @@ PRNTF("Freed", b.b, old_pair.size, old_pair.u.diskoff, bt);
PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt);
//Update inprogress btt if appropriate (if called because Pending bit is set).
if (for_checkpoint) {
- assert(b.b < bt->inprogress.length_of_array);
+ paranoid_invariant(b.b < bt->inprogress.length_of_array);
bt->inprogress.block_translation[b.b] = t->block_translation[b.b];
}
}
@@ -449,17 +453,22 @@ toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF
unlock_for_blocktable(bt);
}
+__attribute__((nonnull,const))
+static inline bool
+pair_is_unallocated(struct block_translation_pair *pair) {
+ return pair->size == 0 && pair->u.diskoff == diskoff_unused;
+}
+
// Purpose of this function is to figure out where to put the inprogress btt on disk, allocate space for it there.
static void
blocknum_alloc_translation_on_disk_unlocked (BLOCK_TABLE bt) {
toku_mutex_assert_locked(&bt->mutex);
struct translation *t = &bt->inprogress;
- assert(t->block_translation);
+ paranoid_invariant_notnull(t->block_translation);
BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION);
- struct block_translation_pair old_pair = t->block_translation[b.b];
//Each inprogress is allocated only once
- assert(old_pair.size == 0 && old_pair.u.diskoff == diskoff_unused);
+ paranoid_invariant(pair_is_unallocated(&t->block_translation[b.b]));
//Allocate a new block
int64_t size = calculate_size_on_disk(t);
@@ -560,7 +569,7 @@ toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT ft) {
t->blocknum_freelist_head = next;
}
//Verify the blocknum is free
- assert(t->block_translation[result.b].size == size_is_free);
+ paranoid_invariant(t->block_translation[result.b].size == size_is_free);
//blocknum is not free anymore
t->block_translation[result.b].u.diskoff = diskoff_unused;
t->block_translation[result.b].size = 0;
@@ -580,9 +589,8 @@ static void
free_blocknum_in_translation(struct translation *t, BLOCKNUM b)
{
verify_valid_freeable_blocknum(t, b);
- struct block_translation_pair old_pair = t->block_translation[b.b];
- assert(old_pair.size != size_is_free);
-
+ paranoid_invariant(t->block_translation[b.b].size != size_is_free);
+
PRNTF("free_blocknum", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt);
t->block_translation[b.b].size = size_is_free;
t->block_translation[b.b].u.next_free_blocknum = t->blocknum_freelist_head;
@@ -601,8 +609,8 @@ free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, FT ft, bool for_checkpoint)
free_blocknum_in_translation(&bt->current, b);
if (for_checkpoint) {
- assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
- free_blocknum_in_translation(&bt->inprogress, b);
+ paranoid_invariant(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS);
+ free_blocknum_in_translation(&bt->inprogress, b);
}
//If the size is 0, no disk block has ever been assigned to this blocknum.
@@ -616,7 +624,10 @@ PRNTF("free_blocknum_free", b.b, old_pair.size, old_pair.u.diskoff, bt);
block_allocator_free_block(bt->block_allocator, old_pair.u.diskoff);
}
}
- else assert(old_pair.size==0 && old_pair.u.diskoff == diskoff_unused);
+ else {
+ paranoid_invariant(old_pair.size==0);
+ paranoid_invariant(old_pair.u.diskoff == diskoff_unused);
+ }
ft_set_dirty(ft, for_checkpoint);
}
@@ -626,11 +637,11 @@ toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, FT ft, bool for_checkpoint) {
free_blocknum_unlocked(bt, bp, ft, for_checkpoint);
unlock_for_blocktable(bt);
}
-
+
//Verify there are no free blocks.
void
-toku_block_verify_no_free_blocknums(BLOCK_TABLE bt) {
- assert(bt->current.blocknum_freelist_head.b == freelist_null.b);
+toku_block_verify_no_free_blocknums(BLOCK_TABLE UU(bt)) {
+ paranoid_invariant(bt->current.blocknum_freelist_head.b == freelist_null.b);
}
// Frees blocknums that have a size of 0 and unused diskoff
@@ -652,31 +663,54 @@ toku_free_unused_blocknums(BLOCK_TABLE bt, BLOCKNUM root) {
unlock_for_blocktable(bt);
}
-
-//Verify there are no data blocks except root.
-void
-toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root) {
+__attribute__((nonnull,const,unused))
+static inline bool
+no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root) {
+ bool ok = true;
lock_for_blocktable(bt);
- assert(root.b >= RESERVED_BLOCKNUMS);
int64_t smallest = bt->current.smallest_never_used_blocknum.b;
- for (int64_t i=RESERVED_BLOCKNUMS; i < smallest; i++) {
+ if (root.b < RESERVED_BLOCKNUMS) {
+ ok = false;
+ goto cleanup;
+ }
+ int64_t i;
+ for (i=RESERVED_BLOCKNUMS; i < smallest; i++) {
if (i == root.b) {
continue;
}
BLOCKNUM b = make_blocknum(i);
- assert(bt->current.block_translation[b.b].size == size_is_free);
+ if (bt->current.block_translation[b.b].size != size_is_free) {
+ ok = false;
+ goto cleanup;
+ }
}
+ cleanup:
unlock_for_blocktable(bt);
+ return ok;
}
-//Verify a blocknum is currently allocated.
+//Verify there are no data blocks except root.
+// TODO(leif): This actually takes a lock, but I don't want to fix all the callers right now.
void
-toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b) {
+toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE UU(bt), BLOCKNUM UU(root)) {
+ paranoid_invariant(no_data_blocks_except_root(bt, root));
+}
+
+__attribute__((nonnull,const,unused))
+static inline bool
+blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b) {
lock_for_blocktable(bt);
struct translation *t = &bt->current;
verify_valid_blocknum(t, b);
- assert(t->block_translation[b.b].size != size_is_free);
+ bool ok = t->block_translation[b.b].size != size_is_free;
unlock_for_blocktable(bt);
+ return ok;
+}
+
+//Verify a blocknum is currently allocated.
+void
+toku_verify_blocknum_allocated(BLOCK_TABLE UU(bt), BLOCKNUM UU(b)) {
+ paranoid_invariant(blocknum_allocated(bt, b));
}
//Only used by toku_dump_translation table (debug info)
@@ -834,12 +868,12 @@ blocktable_note_translation (BLOCK_ALLOCATOR allocator, struct translation *t) {
//See RESERVED_BLOCKNUMS
// Previously this added blocks one at a time. Now we make an array and pass it in so it can be sorted and merged. See #3218.
- struct block_allocator_blockpair *MALLOC_N(t->smallest_never_used_blocknum.b, pairs);
+ struct block_allocator_blockpair *XMALLOC_N(t->smallest_never_used_blocknum.b, pairs);
uint64_t n_pairs = 0;
for (int64_t i=0; i<t->smallest_never_used_blocknum.b; i++) {
struct block_translation_pair pair = t->block_translation[i];
if (pair.size > 0) {
- assert(pair.u.diskoff != diskoff_unused);
+ paranoid_invariant(pair.u.diskoff != diskoff_unused);
int cur_pair = n_pairs++;
pairs[cur_pair] = (struct block_allocator_blockpair) { .offset = (uint64_t) pair.u.diskoff,
.size = (uint64_t) pair.size };
@@ -943,7 +977,7 @@ void
toku_blocktable_internal_fragmentation (BLOCK_TABLE bt, int64_t *total_sizep, int64_t *used_sizep) {
frag_extra info = {0,0};
int r = toku_blocktable_iterate(bt, TRANSLATION_CHECKPOINTED, frag_helper, &info, false, true);
- assert(r==0);
+ assert_zero(r);
if (total_sizep) *total_sizep = info.total_space;
if (used_sizep) *used_sizep = info.used_space;
diff --git a/ft/cachetable-internal.h b/ft/cachetable-internal.h
index b46173b93a2..52b21bac2b7 100644
--- a/ft/cachetable-internal.h
+++ b/ft/cachetable-internal.h
@@ -134,6 +134,7 @@ struct ctpair {
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback;
CACHETABLE_CLEANER_CALLBACK cleaner_callback;
CACHETABLE_CLONE_CALLBACK clone_callback;
+ CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK checkpoint_complete_callback;
void *write_extraargs;
// access to these fields are protected by disk_nb_mutex
@@ -384,7 +385,7 @@ public:
uint64_t reserve_memory(double fraction);
void release_reserved_memory(uint64_t reserved_memory);
void run_eviction_thread();
- void do_partial_eviction(PAIR p);
+ void do_partial_eviction(PAIR p, bool pair_mutex_held);
void evict_pair(PAIR p, bool checkpoint_pending);
void wait_for_cache_pressure_to_subside();
void signal_eviction_thread();
diff --git a/ft/cachetable.cc b/ft/cachetable.cc
index cc174830eec..a97a275a26f 100644
--- a/ft/cachetable.cc
+++ b/ft/cachetable.cc
@@ -16,6 +16,7 @@
#include "cachetable-internal.h"
#include <memory.h>
#include <toku_race_tools.h>
+#include <portability/toku_atomic.h>
#include <portability/toku_pthread.h>
#include <portability/toku_time.h>
#include <util/rwlock.h>
@@ -97,6 +98,10 @@ static inline void pair_unlock(PAIR p) {
toku_mutex_unlock(p->mutex);
}
+bool toku_ctpair_is_write_locked(PAIR pair) {
+ return pair->value_rwlock.writers() == 1;
+}
+
void
toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized) {
@@ -706,7 +711,7 @@ static void cachetable_evicter(void* extra) {
static void cachetable_partial_eviction(void* extra) {
PAIR p = (PAIR)extra;
CACHEFILE cf = p->cachefile;
- p->ev->do_partial_eviction(p);
+ p->ev->do_partial_eviction(p, false);
bjm_remove_background_job(cf->bjm);
}
@@ -750,6 +755,7 @@ void pair_init(PAIR p,
p->pe_est_callback = write_callback.pe_est_callback;
p->cleaner_callback = write_callback.cleaner_callback;
p->clone_callback = write_callback.clone_callback;
+ p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback;
p->write_extraargs = write_callback.write_extraargs;
p->count = 0; // <CER> Is zero the correct init value?
@@ -915,6 +921,9 @@ checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
static void
write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending)
{
+ if (checkpoint_pending && p->checkpoint_complete_callback) {
+ p->checkpoint_complete_callback(p->value_data);
+ }
if (p->dirty && checkpoint_pending) {
if (p->clone_callback) {
pair_lock(p);
@@ -952,6 +961,9 @@ write_pair_for_checkpoint_thread (evictor* ev, PAIR p)
// will be cheap. Also, much of the time we'll just be clearing
// pending bits and that's definitely cheap. (see #5427)
p->value_rwlock.write_lock(false);
+ if (p->checkpoint_pending && p->checkpoint_complete_callback) {
+ p->checkpoint_complete_callback(p->value_data);
+ }
if (p->dirty && p->checkpoint_pending) {
if (p->clone_callback) {
nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
@@ -1726,73 +1738,100 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
// For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify
// the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit.
// Similarly, if the checkpoint is actually pending, we don't want to block on it.
-int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void**value) {
+int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
CACHETABLE ct = cachefile->cachetable;
int r = -1;
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cachefile, key, fullhash);
- if (p && p->value_rwlock.try_write_lock(true)) {
- // we got the write lock fast, so continue
- ct->list.read_pending_cheap_lock();
- //
- // if pending a checkpoint, then we don't want to return
- // the value to the user, because we are responsible for
- // handling the checkpointing, which we do not want to do,
- // because it is expensive
- //
- if (!p->dirty || p->checkpoint_pending) {
- p->value_rwlock.write_unlock();
- r = -1;
+ if (p) {
+ const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
+ bool got_lock = false;
+ switch (lock_type) {
+ case PL_READ:
+ if (p->value_rwlock.try_read_lock()) {
+ got_lock = p->dirty;
+
+ if (!got_lock) {
+ p->value_rwlock.read_unlock();
+ }
+ }
+ break;
+ case PL_WRITE_CHEAP:
+ case PL_WRITE_EXPENSIVE:
+ if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
+ // we got the lock fast, so continue
+ ct->list.read_pending_cheap_lock();
+
+ // if pending a checkpoint, then we don't want to return
+ // the value to the user, because we are responsible for
+ // handling the checkpointing, which we do not want to do,
+ // because it is expensive
+ got_lock = p->dirty && !p->checkpoint_pending;
+
+ ct->list.read_pending_cheap_unlock();
+ if (!got_lock) {
+ p->value_rwlock.write_unlock();
+ }
+ }
+ break;
}
- else {
+ if (got_lock) {
+ pair_touch(p);
*value = p->value_data;
r = 0;
}
- ct->list.read_pending_cheap_unlock();
- pair_unlock(p);
- }
- else {
- ct->list.pair_unlock_by_fullhash(fullhash);
}
+ ct->list.pair_unlock_by_fullhash(fullhash);
return r;
}
//Used by flusher threads to possibly pin child on client thread if pinning is cheap
//Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless).
//All other conditions remain the same.
-int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void**value) {
+int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
CACHETABLE ct = cachefile->cachetable;
int r = -1;
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cachefile, key, fullhash);
- if (p && p->value_rwlock.try_write_lock(true)) {
- // got the write lock fast, so continue
- ct->list.read_pending_cheap_lock();
- //
- // if pending a checkpoint, then we don't want to return
- // the value to the user, because we are responsible for
- // handling the checkpointing, which we do not want to do,
- // because it is expensive
- //
- if (p->checkpoint_pending) {
- if (p->dirty) {
- p->value_rwlock.write_unlock();
- r = -1;
+ if (p) {
+ const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
+ bool got_lock = false;
+ switch (lock_type) {
+ case PL_READ:
+ if (p->value_rwlock.try_read_lock()) {
+ got_lock = true;
+ } else if (!p->value_rwlock.read_lock_is_expensive()) {
+ p->value_rwlock.write_lock(lock_is_expensive);
+ got_lock = true;
}
- else {
- p->checkpoint_pending = false;
- *value = p->value_data;
- r = 0;
+ if (got_lock) {
+ pair_touch(p);
+ }
+ pair_unlock(p);
+ break;
+ case PL_WRITE_CHEAP:
+ case PL_WRITE_EXPENSIVE:
+ if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
+ got_lock = true;
+ } else if (!p->value_rwlock.write_lock_is_expensive()) {
+ p->value_rwlock.write_lock(lock_is_expensive);
+ got_lock = true;
}
+ if (got_lock) {
+ pair_touch(p);
+ }
+ pair_unlock(p);
+ if (got_lock) {
+ bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
+ write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
+ }
+ break;
}
- else {
+ if (got_lock) {
*value = p->value_data;
r = 0;
}
- ct->list.read_pending_cheap_unlock();
- pair_unlock(p);
- }
- else {
+ } else {
ct->list.pair_unlock_by_fullhash(fullhash);
}
return r;
@@ -3524,7 +3563,7 @@ void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) {
// the size of the cachetable.
//
void evictor::add_to_size_current(long size) {
- (void) __sync_fetch_and_add(&m_size_current, size);
+ (void) toku_sync_fetch_and_add(&m_size_current, size);
}
//
@@ -3532,7 +3571,7 @@ void evictor::add_to_size_current(long size) {
// approximation of the cachetable size.
//
void evictor::remove_from_size_current(long size) {
- (void) __sync_fetch_and_sub(&m_size_current, size);
+ (void) toku_sync_fetch_and_sub(&m_size_current, size);
}
//
@@ -3543,7 +3582,7 @@ uint64_t evictor::reserve_memory(double fraction) {
toku_mutex_lock(&m_ev_thread_lock);
reserved_memory = fraction * (m_low_size_watermark - m_size_reserved);
m_size_reserved += reserved_memory;
- (void) __sync_fetch_and_add(&m_size_current, reserved_memory);
+ (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory);
this->signal_eviction_thread();
toku_mutex_unlock(&m_ev_thread_lock);
@@ -3557,7 +3596,7 @@ uint64_t evictor::reserve_memory(double fraction) {
// TODO: (Zardosht) comment this function
//
void evictor::release_reserved_memory(uint64_t reserved_memory){
- (void) __sync_fetch_and_sub(&m_size_current, reserved_memory);
+ (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory);
toku_mutex_lock(&m_ev_thread_lock);
m_size_reserved -= reserved_memory;
// signal the eviction thread in order to possibly wake up sleeping clients
@@ -3710,7 +3749,6 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
curr_in_clock->count--;
// call the partial eviction callback
curr_in_clock->value_rwlock.write_lock(true);
- pair_unlock(curr_in_clock);
void *value = curr_in_clock->value_data;
void* disk_data = curr_in_clock->disk_data;
@@ -3726,13 +3764,15 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
);
if (cost == PE_CHEAP) {
curr_in_clock->size_evicting_estimate = 0;
- this->do_partial_eviction(curr_in_clock);
+ this->do_partial_eviction(curr_in_clock, true);
bjm_remove_background_job(cf->bjm);
+ pair_unlock(curr_in_clock);
}
else if (cost == PE_EXPENSIVE) {
// only bother running an expensive partial eviction
// if it is expected to free space
if (bytes_freed_estimate > 0) {
+ pair_unlock(curr_in_clock);
curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
toku_mutex_lock(&m_ev_thread_lock);
m_size_evicting += bytes_freed_estimate;
@@ -3744,7 +3784,6 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
);
}
else {
- pair_lock(curr_in_clock);
curr_in_clock->value_rwlock.write_unlock();
pair_unlock(curr_in_clock);
bjm_remove_background_job(cf->bjm);
@@ -3767,10 +3806,10 @@ exit:
}
//
-// on entry, pair's mutex is not held, but pair is pinned
+// on entry and exit, pair's mutex is held if pair_mutex_held is true
// on exit, PAIR is unpinned
//
-void evictor::do_partial_eviction(PAIR p) {
+void evictor::do_partial_eviction(PAIR p, bool pair_mutex_held) {
PAIR_ATTR new_attr;
PAIR_ATTR old_attr = p->attr;
@@ -3779,9 +3818,13 @@ void evictor::do_partial_eviction(PAIR p) {
this->change_pair_attr(old_attr, new_attr);
p->attr = new_attr;
this->decrease_size_evicting(p->size_evicting_estimate);
- pair_lock(p);
+ if (!pair_mutex_held) {
+ pair_lock(p);
+ }
p->value_rwlock.write_unlock();
- pair_unlock(p);
+ if (!pair_mutex_held) {
+ pair_unlock(p);
+ }
}
//
diff --git a/ft/cachetable.h b/ft/cachetable.h
index 186916f7ab8..95e7f1e091a 100644
--- a/ft/cachetable.h
+++ b/ft/cachetable.h
@@ -173,12 +173,15 @@ typedef int (*CACHETABLE_CLEANER_CALLBACK)(void *ftnode_pv, BLOCKNUM blocknum, u
typedef void (*CACHETABLE_CLONE_CALLBACK)(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
+typedef void (*CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK)(void *value_data);
+
typedef struct {
CACHETABLE_FLUSH_CALLBACK flush_callback;
CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback;
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback;
CACHETABLE_CLEANER_CALLBACK cleaner_callback;
CACHETABLE_CLONE_CALLBACK clone_callback;
+ CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK checkpoint_complete_callback;
void* write_extraargs; // parameter for flush_callback, pe_est_callback, pe_callback, and cleaner_callback
} CACHETABLE_WRITE_CALLBACK;
@@ -366,14 +369,14 @@ int toku_cachetable_get_and_pin_nonblocking (
UNLOCKERS unlockers
);
-int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, void**);
+int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, pair_lock_type, void**);
// Effect: Maybe get and pin a memory object.
// This function is similar to the get_and_pin function except that it
// will not attempt to fetch a memory object that is not in the cachetable or requires any kind of blocking to get it.
// Returns: If the the item is already in memory, then return 0 and store it in the
// void**. If the item is not in memory, then return a nonzero error number.
-int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, void**);
+int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, pair_lock_type, void**);
// Effect: Like maybe get and pin, but may pin a clean pair.
int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
@@ -556,5 +559,10 @@ int toku_cleaner_thread(void *cleaner_v);
// The default of 1M is too high for drd tests, so this is a mechanism to set a smaller number.
void toku_pair_list_set_lock_size(uint32_t num_locks);
+// Used by ft-ops.cc to figure out if it has the write lock on a pair.
+// Pretty hacky and not accurate enough, should be improved at the frwlock
+// layer.
+__attribute__((const,nonnull))
+bool toku_ctpair_is_write_locked(PAIR pair);
#endif /* CACHETABLE_H */
diff --git a/ft/checkpoint.cc b/ft/checkpoint.cc
index 9aaf1692572..52b4d741329 100644
--- a/ft/checkpoint.cc
+++ b/ft/checkpoint.cc
@@ -49,6 +49,7 @@
#include "log-internal.h"
#include "logger.h"
#include "checkpoint.h"
+#include <portability/toku_atomic.h>
///////////////////////////////////////////////////////////////////////////////////
// Engine status
@@ -173,7 +174,7 @@ checkpoint_safe_checkpoint_unlock(void) {
void
toku_multi_operation_client_lock(void) {
if (locked_mo)
- (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1);
toku_pthread_rwlock_rdlock(&multi_operation_lock);
}
@@ -185,7 +186,7 @@ toku_multi_operation_client_unlock(void) {
void
toku_checkpoint_safe_client_lock(void) {
if (locked_cs)
- (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1);
toku_pthread_rwlock_rdlock(&checkpoint_safe_lock);
toku_multi_operation_client_lock();
}
@@ -227,9 +228,9 @@ toku_checkpoint(CHECKPOINTER cp, TOKULOGGER logger,
assert(initialized);
- (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1);
checkpoint_safe_checkpoint_lock();
- (void) __sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1);
+ (void) toku_sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1);
if (STATUS_VALUE(CP_WAITERS_NOW) > STATUS_VALUE(CP_WAITERS_MAX))
STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock
diff --git a/ft/fifo.cc b/ft/fifo.cc
index 30b961c6908..f7b6b871967 100644
--- a/ft/fifo.cc
+++ b/ft/fifo.cc
@@ -25,12 +25,23 @@ static void fifo_init(struct fifo *fifo) {
fifo->memory_used = 0;
}
+__attribute__((const,nonnull))
static int fifo_entry_size(struct fifo_entry *entry) {
return sizeof (struct fifo_entry) + entry->keylen + entry->vallen
+ xids_get_size(&entry->xids_s)
- sizeof(XIDS_S); //Prevent double counting from fifo_entry+xids_get_size
}
+__attribute__((const,nonnull))
+size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd) {
+ // This must stay in sync with fifo_entry_size because that's what we
+ // really trust. But sometimes we only have an in-memory FT_MSG, not
+ // a serialized fifo_entry so we have to fake it.
+ return sizeof (struct fifo_entry) + cmd->u.id.key->size + cmd->u.id.val->size
+ + xids_get_size(cmd->xids)
+ - sizeof(XIDS_S);
+}
+
int toku_fifo_create(FIFO *ptr) {
struct fifo *XMALLOC(fifo);
if (fifo == 0) return ENOMEM;
@@ -112,6 +123,9 @@ int toku_fifo_iterate_internal_next(FIFO fifo, int off) {
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) {
return (struct fifo_entry *)(fifo->memory + off);
}
+size_t toku_fifo_internal_entry_memsize(struct fifo_entry *e) {
+ return fifo_entry_size(e);
+}
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, void*), void *arg) {
FIFO_ITERATE(fifo,
diff --git a/ft/fifo.h b/ft/fifo.h
index 9af8191bc3f..75eb14a737c 100644
--- a/ft/fifo.h
+++ b/ft/fifo.h
@@ -68,26 +68,30 @@ unsigned long toku_fifo_memory_footprint(FIFO fifo); // return how much memory
void toku_fifo_iterate(FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,msnvar,xidsvar,is_freshvar,body) ({ \
- for (int fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
- toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
- fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \
- struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \
- ITEMLEN keylenvar = e->keylen; \
- ITEMLEN datalenvar = e->vallen; \
- enum ft_msg_type typevar = fifo_entry_get_msg_type(e); \
- MSN msnvar = e->msn; \
- XIDS xidsvar = &e->xids_s; \
- bytevec keyvar = xids_get_end_of_array(xidsvar); \
- bytevec datavar = (const uint8_t*)keyvar + e->keylen; \
- bool is_freshvar = e->is_fresh; \
- body; \
+ for (int fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
+ toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
+ fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \
+ struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \
+ ITEMLEN keylenvar = e->keylen; \
+ ITEMLEN datalenvar = e->vallen; \
+ enum ft_msg_type typevar = fifo_entry_get_msg_type(e); \
+ MSN msnvar = e->msn; \
+ XIDS xidsvar = &e->xids_s; \
+ bytevec keyvar = xids_get_end_of_array(xidsvar); \
+ bytevec datavar = (const uint8_t*)keyvar + e->keylen; \
+ bool is_freshvar = e->is_fresh; \
+ body; \
} })
+#define FIFO_CURRENT_ENTRY_MEMSIZE toku_fifo_internal_entry_memsize(e)
+
// Internal functions for the iterator.
int toku_fifo_iterate_internal_start(FIFO fifo);
int toku_fifo_iterate_internal_has_more(FIFO fifo, int off);
int toku_fifo_iterate_internal_next(FIFO fifo, int off);
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off);
+size_t toku_fifo_internal_entry_memsize(struct fifo_entry *e) __attribute__((const,nonnull));
+size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd) __attribute__((const,nonnull));
DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry);
struct fifo_entry *toku_fifo_get_entry(FIFO fifo, int off);
diff --git a/ft/ft-cachetable-wrappers.cc b/ft/ft-cachetable-wrappers.cc
index 9ab4fb3ec91..a0789ea50b5 100644
--- a/ft/ft-cachetable-wrappers.cc
+++ b/ft/ft-cachetable-wrappers.cc
@@ -147,7 +147,7 @@ try_again_for_write_lock:
if (apply_ancestor_messages && node->height == 0) {
needs_ancestors_messages = toku_ft_leaf_needs_ancestors_messages(brt->ft, node, ancestors, bounds, &max_msn_in_path);
if (needs_ancestors_messages && needed_lock_type == PL_READ) {
- toku_unpin_ftnode_read_only(brt, node);
+ toku_unpin_ftnode_read_only(brt->ft, node);
needed_lock_type = PL_WRITE_CHEAP;
goto try_again_for_write_lock;
}
@@ -296,14 +296,14 @@ toku_pin_ftnode_off_client_thread_batched(
h, blocknum, fullhash, bfe, lock_type, num_dependent_nodes, dependent_nodes, node_p, true);
}
-int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep) {
+int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, pair_lock_type lock_type, FTNODE *nodep) {
void *node_v;
- int r = toku_cachetable_maybe_get_and_pin_clean(ft->cf, blocknum, fullhash, &node_v);
+ int r = toku_cachetable_maybe_get_and_pin_clean(ft->cf, blocknum, fullhash, lock_type, &node_v);
if (r != 0) {
goto cleanup;
}
CAST_FROM_VOIDP(*nodep, node_v);
- if ((*nodep)->height > 0) {
+ if ((*nodep)->height > 0 && lock_type != PL_READ) {
toku_move_ftnode_messages_to_stale(ft, *nodep);
}
cleanup:
@@ -331,14 +331,13 @@ toku_unpin_ftnode(FT ft, FTNODE node)
}
void
-toku_unpin_ftnode_read_only(FT_HANDLE brt, FTNODE node)
+toku_unpin_ftnode_read_only(FT ft, FTNODE node)
{
int r = toku_cachetable_unpin(
- brt->ft->cf,
+ ft->cf,
node->ct_pair,
(enum cachetable_dirty) node->dirty,
make_invalid_pair_attr()
);
assert(r==0);
}
-
diff --git a/ft/ft-cachetable-wrappers.h b/ft/ft-cachetable-wrappers.h
index bcfbe86a6f8..b9a5d3f6451 100644
--- a/ft/ft-cachetable-wrappers.h
+++ b/ft/ft-cachetable-wrappers.h
@@ -108,7 +108,7 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages(
* This function may return a pinned ftnode to the caller, if pinning is cheap.
* If the node is already locked, or is pending a checkpoint, the node is not pinned and -1 is returned.
*/
-int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep);
+int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, pair_lock_type lock_type, FTNODE *nodep);
/**
* Batched version of toku_pin_ftnode_off_client_thread, see cachetable
@@ -158,6 +158,6 @@ void
toku_unpin_ftnode(FT h, FTNODE node);
void
-toku_unpin_ftnode_read_only(FT_HANDLE brt, FTNODE node);
+toku_unpin_ftnode_read_only(FT ft, FTNODE node);
#endif
diff --git a/ft/ft-flusher.cc b/ft/ft-flusher.cc
index 0cadf62699f..81801cdaec3 100644
--- a/ft/ft-flusher.cc
+++ b/ft/ft-flusher.cc
@@ -9,6 +9,8 @@
#include <ft-flusher-internal.h>
#include <ft-cachetable-wrappers.h>
#include <ft.h>
+#include <toku_assert.h>
+#include <portability/toku_atomic.h>
/* Status is intended for display to humans to help understand system behavior.
* It does not need to be perfectly thread-safe.
@@ -98,11 +100,13 @@ find_heaviest_child(FTNODE node)
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
- assert(node->n_children>0);
+ paranoid_invariant(node->n_children>0);
for (i=1; i<node->n_children; i++) {
+#ifdef TOKU_DEBUG_PARANOID
if (BP_WORKDONE(node,i)) {
assert(toku_bnc_nbytesinbuf(BNC(node,i)) > 0);
}
+#endif
int this_weight = toku_bnc_nbytesinbuf(BNC(node,i)) + BP_WORKDONE(node,i);;
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
@@ -180,7 +184,7 @@ pick_heaviest_child(FT UU(h),
void* UU(extra))
{
int childnum = find_heaviest_child(parent);
- assert(toku_bnc_n_entries(BNC(parent, childnum))>0);
+ paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
return childnum;
}
@@ -348,7 +352,7 @@ ctm_maybe_merge_child(struct flusher_advice *fa,
void *extra)
{
if (child->height == 0) {
- (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
}
default_merge_child(fa, h, parent, childnum, child, extra);
}
@@ -366,7 +370,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
}
else {
struct ctm_extra ctme;
- assert(parent->n_children > 1);
+ paranoid_invariant(parent->n_children > 1);
int pivot_to_save;
//
// we have two cases, one where the childnum
@@ -413,12 +417,12 @@ ct_maybe_merge_child(struct flusher_advice *fa,
toku_assert_entire_node_in_memory(root_node);
}
- (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
- (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
flush_some_child(h, root_node, &new_fa);
- (void) __sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
+ (void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
toku_free(ctme.target_key.data);
}
@@ -483,13 +487,14 @@ handle_split_of_child(
DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
)
{
- assert(node->height>0);
- assert(0 <= childnum && childnum < node->n_children);
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(0 <= childnum);
+ paranoid_invariant(childnum < node->n_children);
toku_assert_entire_node_in_memory(node);
toku_assert_entire_node_in_memory(childa);
toku_assert_entire_node_in_memory(childb);
- int old_count = toku_bnc_nbytesinbuf(BNC(node, childnum));
- assert(old_count==0);
+ NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
+ paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
int cnum;
WHEN_NOT_GCOV(
if (toku_ft_debug_mode) {
@@ -515,13 +520,20 @@ handle_split_of_child(
memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
node->n_children++;
- assert(BP_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
+ paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
BP_BLOCKNUM(node, childnum+1) = childb->thisnodename;
BP_WORKDONE(node, childnum+1) = 0;
BP_STATE(node,childnum+1) = PT_AVAIL;
- set_BNC(node, childnum+1, toku_create_empty_nl());
+ NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
+ for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
+ // just split the flows in half for now, can't guess much better
+ // at the moment
+ new_bnc->flow[i] = old_bnc->flow[i] / 2;
+ old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
+ }
+ set_BNC(node, childnum+1, new_bnc);
// Slide the keys over
{
@@ -553,7 +565,7 @@ handle_split_of_child(
}
static int
-verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv)
+UU() verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv)
{
LEAFENTRY CAST_FROM_VOIDP(le, lev);
struct mempool *CAST_FROM_VOIDP(mp, mpv);
@@ -563,8 +575,9 @@ verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv)
}
static void
-verify_all_in_mempool(FTNODE node)
+verify_all_in_mempool(FTNODE UU() node)
{
+#ifdef TOKU_DEBUG_PARANOID
if (node->height==0) {
for (int i = 0; i < node->n_children; i++) {
invariant(BP_STATE(node,i) == PT_AVAIL);
@@ -572,13 +585,14 @@ verify_all_in_mempool(FTNODE node)
toku_omt_iterate(bn->buffer, verify_in_mempool, &bn->buffer_mempool);
}
}
+#endif
}
static uint64_t
ftleaf_disk_size(FTNODE node)
// Effect: get the disk size of a leafentry
{
- assert(node->height == 0);
+ paranoid_invariant(node->height == 0);
toku_assert_entire_node_in_memory(node);
uint64_t retval = 0;
for (int i = 0; i < node->n_children; i++) {
@@ -587,8 +601,8 @@ ftleaf_disk_size(FTNODE node)
for (uint32_t j=0; j < n_leafentries; j++) {
OMTVALUE v;
int r = toku_omt_fetch(curr_buffer, j, &v);
- LEAFENTRY CAST_FROM_VOIDP(curr_le, v);
assert_zero(r);
+ LEAFENTRY CAST_FROM_VOIDP(curr_le, v);
retval += leafentry_disksize(curr_le);
}
}
@@ -598,47 +612,69 @@ ftleaf_disk_size(FTNODE node)
static void
ftleaf_get_split_loc(
FTNODE node,
- uint64_t sumlesizes,
- int* bn_index, // which basement within leaf
- int* le_index // which key within basement
+ enum split_mode split_mode,
+ int *num_left_bns, // which basement within leaf
+ int *num_left_les // which key within basement
)
// Effect: Find the location within a leaf node where we want to perform a split
-// bn_index is which basement node (which OMT) should be split.
-// le_index is index into OMT of the last key that should be on the left side of the split.
+// num_left_bns is how many basement nodes (which OMT) should be split to the left.
+// num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
{
- assert(node->height == 0);
- uint32_t size_so_far = 0;
- for (int i = 0; i < node->n_children; i++) {
- OMT curr_buffer = BLB_BUFFER(node, i);
- uint32_t n_leafentries = toku_omt_size(curr_buffer);
- for (uint32_t j=0; j < n_leafentries; j++) {
- OMTVALUE lev;
- int r = toku_omt_fetch(curr_buffer, j, &lev);
- assert_zero(r);
- LEAFENTRY CAST_FROM_VOIDP(curr_le, lev);
- size_so_far += leafentry_disksize(curr_le);
- if (size_so_far >= sumlesizes/2) {
- *bn_index = i;
- *le_index = j;
- if ((*bn_index == node->n_children - 1) &&
- ((unsigned int) *le_index == n_leafentries - 1)) {
- // need to correct for when we're splitting after the
- // last element, that makes no sense
- if (*le_index > 0) {
- (*le_index)--;
- } else if (*bn_index > 0) {
- (*bn_index)--;
- *le_index = toku_omt_size(BLB_BUFFER(node, *bn_index)) - 1;
- } else {
- // we are trying to split a leaf with only one
- // leafentry in it
- abort();
+ switch (split_mode) {
+ case SPLIT_LEFT_HEAVY: {
+ *num_left_bns = node->n_children;
+ *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
+ if (*num_left_les == 0) {
+ *num_left_bns = node->n_children - 1;
+ *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
+ invariant(*num_left_les > 0);
+ }
+ goto exit;
+ }
+ case SPLIT_RIGHT_HEAVY: {
+ *num_left_bns = 1;
+ *num_left_les = 1;
+ goto exit;
+ }
+ case SPLIT_EVENLY: {
+ paranoid_invariant(node->height == 0);
+ // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
+ uint64_t sumlesizes = ftleaf_disk_size(node);
+ uint32_t size_so_far = 0;
+ for (int i = 0; i < node->n_children; i++) {
+ OMT curr_buffer = BLB_BUFFER(node, i);
+ uint32_t n_leafentries = toku_omt_size(curr_buffer);
+ for (uint32_t j=0; j < n_leafentries; j++) {
+ OMTVALUE lev;
+ int r = toku_omt_fetch(curr_buffer, j, &lev);
+ assert_zero(r);
+ LEAFENTRY CAST_FROM_VOIDP(curr_le, lev);
+ size_so_far += leafentry_disksize(curr_le);
+ if (size_so_far >= sumlesizes/2) {
+ *num_left_bns = i + 1;
+ *num_left_les = j + 1;
+ if (*num_left_bns == node->n_children &&
+ (unsigned int) *num_left_les == n_leafentries) {
+ // need to correct for when we're splitting after the
+ // last element, that makes no sense
+ if (*num_left_les > 1) {
+ (*num_left_les)--;
+ } else if (*num_left_bns > 1) {
+ (*num_left_bns)--;
+ *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
+ } else {
+ // we are trying to split a leaf with only one
+ // leafentry in it
+ abort();
+ }
}
+ goto exit;
}
- goto exit;
}
}
}
+ }
+ abort();
exit:
return;
}
@@ -655,7 +691,7 @@ move_leafentries(
)
//Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
{
- assert(lbi < ube);
+ paranoid_invariant(lbi < ube);
OMTVALUE *XMALLOC_N(ube-lbi, newleafpointers); // create new omt
size_t mpsize = toku_mempool_get_used_space(&src_bn->buffer_mempool); // overkill, but safe
@@ -692,6 +728,7 @@ ftleaf_split(
FTNODE *nodeb,
DBT *splitk,
bool create_new_node,
+ enum split_mode split_mode,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes)
// Effect: Split a leaf node.
@@ -702,7 +739,7 @@ ftleaf_split(
// splitk is the right-most key of nodea
{
- invariant(node->height == 0);
+ paranoid_invariant(node->height == 0);
STATUS_VALUE(FT_FLUSHER_SPLIT_LEAF)++;
if (node->n_children) {
// First move all the accumulated stat64info deltas into the first basement.
@@ -744,31 +781,20 @@ ftleaf_split(
}
- assert(node->height==0);
+ paranoid_invariant(node->height==0);
toku_assert_entire_node_in_memory(node);
verify_all_in_mempool(node);
MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
- // variables that say where we will do the split. We do it in the basement node indexed at
- // at last_bn_on_left and at the index last_le_on_left_within_bn within that basement node.
- int last_bn_on_left = 0; // last_bn_on_left may or may not be fully included
- int last_le_on_left_within_bn = 0;
+ // variables that say where we will do the split.
+ // After the split, there will be num_left_bns basement nodes in the left node,
+ // and the last basement node in the left node will have num_left_les leafentries.
+ int num_left_bns;
+ int num_left_les;
+ ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
{
- {
- // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
- uint64_t sumlesizes=0;
- sumlesizes = ftleaf_disk_size(node);
- // TODO: (Zardosht) #3537, figure out serial insertion optimization again later
- // split in half
- ftleaf_get_split_loc(
- node,
- sumlesizes,
- &last_bn_on_left,
- &last_le_on_left_within_bn
- );
- }
// did we split right on the boundary between basement nodes?
- const bool split_on_boundary = (last_le_on_left_within_bn == (int) toku_omt_size(BLB_BUFFER(node, last_bn_on_left)) - 1);
+ const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) toku_omt_size(BLB_BUFFER(node, num_left_bns - 1)));
// Now we know where we are going to break it
// the two nodes will have a total of n_children+1 basement nodes
// and n_children-1 pivots
@@ -781,8 +807,17 @@ ftleaf_split(
// (if split_on_boundary is false) will be affected. All other mempools will remain intact. ???
//set up the basement nodes in the new node
- int num_children_in_node = last_bn_on_left + 1;
- int num_children_in_b = node->n_children - last_bn_on_left - (split_on_boundary ? 1 : 0);
+ int num_children_in_node = num_left_bns;
+ // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
+ // while it's not on the boundary, we do need node->n_children
+ // children in B.
+ int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
+ if (num_children_in_b == 0) {
+ // for uneven split, make sure we have at least 1 bn
+ paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
+ num_children_in_b = 1;
+ }
+ paranoid_invariant(num_children_in_node > 0);
if (create_new_node) {
toku_initialize_empty_ftnode(
B,
@@ -808,19 +843,19 @@ ftleaf_split(
// now move all the data
- int curr_src_bn_index = last_bn_on_left;
+ int curr_src_bn_index = num_left_bns - 1;
int curr_dest_bn_index = 0;
// handle the move of a subset of data in last_bn_on_left from node to B
if (!split_on_boundary) {
BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
uint32_t diff_size = 0;
- destroy_basement_node (BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
+ destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
set_BNULL(B, curr_dest_bn_index);
set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
move_leafentries(BLB(B, curr_dest_bn_index),
BLB(node, curr_src_bn_index),
- last_le_on_left_within_bn+1, // first row to be moved to B
+ num_left_les, // first row to be moved to B
toku_omt_size(BLB_BUFFER(node, curr_src_bn_index)), // number of rows in basement to be split
&diff_size);
BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
@@ -830,15 +865,20 @@ ftleaf_split(
}
curr_src_bn_index++;
- invariant(B->n_children >= curr_dest_bn_index);
- invariant(node->n_children >= curr_src_bn_index);
- invariant(B->n_children - curr_dest_bn_index == node->n_children - curr_src_bn_index);
+ paranoid_invariant(B->n_children >= curr_dest_bn_index);
+ paranoid_invariant(node->n_children >= curr_src_bn_index);
+
// move the rest of the basement nodes
for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
destroy_basement_node(BLB(B, curr_dest_bn_index));
set_BNULL(B, curr_dest_bn_index);
B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
}
+ if (curr_dest_bn_index < B->n_children) {
+ // B already has an empty basement node here.
+ BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
+ }
+
node->n_children = num_children_in_node;
//
@@ -847,7 +887,7 @@ ftleaf_split(
// the child index in the original node that corresponds to the
// first node in the right node of the split
- int base_index = (split_on_boundary ? last_bn_on_left + 1 : last_bn_on_left);
+ int base_index = num_left_bns - (split_on_boundary ? 0 : 1);
// make pivots in B
for (int i=0; i < num_children_in_b-1; i++) {
toku_copyref_dbt(&B->childkeys[i], node->childkeys[i+base_index]);
@@ -855,10 +895,10 @@ ftleaf_split(
node->totalchildkeylens -= node->childkeys[i+base_index].size;
toku_init_dbt(&node->childkeys[i+base_index]);
}
- if (split_on_boundary) {
+ if (split_on_boundary && split_mode != SPLIT_LEFT_HEAVY) {
// destroy the extra childkey between the nodes, we'll
// recreate it in splitk below
- toku_free(node->childkeys[last_bn_on_left].data);
+ toku_free(node->childkeys[num_left_bns - 1].data);
}
REALLOC_N(num_children_in_node, node->bp);
REALLOC_N(num_children_in_node-1, node->childkeys);
@@ -867,7 +907,7 @@ ftleaf_split(
if (splitk) {
memset(splitk, 0, sizeof *splitk);
OMTVALUE lev;
- OMT buffer = BLB_BUFFER(node, last_bn_on_left);
+ OMT buffer = BLB_BUFFER(node, num_left_bns - 1);
int r = toku_omt_fetch(buffer, toku_omt_size(buffer) - 1, &lev);
assert_zero(r); // that fetch should have worked.
LEAFENTRY CAST_FROM_VOIDP(le, lev);
@@ -908,8 +948,8 @@ ft_nonleaf_split(
int n_children_in_b = old_n_children-n_children_in_a;
MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
FTNODE B;
- assert(node->height>0);
- assert(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
create_new_ftnode_with_dep_nodes(h, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
{
/* The first n_children_in_a go into node a.
@@ -932,7 +972,7 @@ ft_nonleaf_split(
// Delete a child, removing the preceeding pivot key. The child number must be > 0
{
- assert(i>0);
+ paranoid_invariant(i>0);
if (i>n_children_in_a) {
toku_copyref_dbt(&B->childkeys[targchild-1], node->childkeys[i-1]);
B->totalchildkeylens += node->childkeys[i-1].size;
@@ -978,10 +1018,11 @@ ft_split_child(
FTNODE node,
int childnum,
FTNODE child,
+ enum split_mode split_mode,
struct flusher_advice *fa)
{
- assert(node->height>0);
- assert(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
FTNODE nodea, nodeb;
DBT splitk;
@@ -992,7 +1033,7 @@ ft_split_child(
dep_nodes[0] = node;
dep_nodes[1] = child;
if (child->height==0) {
- ftleaf_split(h, child, &nodea, &nodeb, &splitk, true, 2, dep_nodes);
+ ftleaf_split(h, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
} else {
ft_nonleaf_split(h, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
}
@@ -1040,8 +1081,8 @@ flush_this_child(
}
bring_node_fully_into_memory(child, h);
toku_assert_entire_node_in_memory(child);
- assert(node->height>0);
- assert(child->thisnodename.b!=0);
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(child->thisnodename.b!=0);
// VERIFY_NODE does not work off client thread as of now
//VERIFY_NODE(t, child);
node->dirty = 1;
@@ -1062,10 +1103,10 @@ merge_leaf_nodes(FTNODE a, FTNODE b)
STATUS_VALUE(FT_FLUSHER_MERGE_LEAF)++;
toku_assert_entire_node_in_memory(a);
toku_assert_entire_node_in_memory(b);
- assert(a->height == 0);
- assert(b->height == 0);
- assert(a->n_children > 0);
- assert(b->n_children > 0);
+ paranoid_invariant(a->height == 0);
+ paranoid_invariant(b->height == 0);
+ paranoid_invariant(a->n_children > 0);
+ paranoid_invariant(b->n_children > 0);
// Mark nodes as dirty before moving basements from b to a.
// This way, whatever deltas are accumulated in the basements are
@@ -1148,7 +1189,7 @@ static void balance_leaf_nodes(
merge_leaf_nodes(a,b);
// now split them
// because we are not creating a new node, we can pass in no dependent nodes
- ftleaf_split(NULL, a, &a, &b, splitk, false, 0, NULL);
+ ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
}
static void
@@ -1202,7 +1243,7 @@ maybe_merge_pinned_nonleaf_nodes(
{
toku_assert_entire_node_in_memory(a);
toku_assert_entire_node_in_memory(b);
- assert(parent_splitk->data);
+ paranoid_invariant(parent_splitk->data);
int old_n_children = a->n_children;
int new_n_children = old_n_children + b->n_children;
XREALLOC_N(new_n_children, a->bp);
@@ -1262,7 +1303,7 @@ maybe_merge_pinned_nodes(
// splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
{
MSN msn_max;
- assert(a->height == b->height);
+ paranoid_invariant(a->height == b->height);
toku_assert_entire_node_in_memory(parent);
toku_assert_entire_node_in_memory(a);
toku_assert_entire_node_in_memory(b);
@@ -1271,9 +1312,6 @@ maybe_merge_pinned_nodes(
MSN msna = a->max_msn_applied_to_node_on_disk;
MSN msnb = b->max_msn_applied_to_node_on_disk;
msn_max = (msna.msn > msnb.msn) ? msna : msnb;
- if (a->height > 0) {
- invariant(msn_max.msn <= parent->max_msn_applied_to_node_on_disk.msn); // parent msn must be >= children's msn
- }
}
if (a->height == 0) {
maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
@@ -1312,7 +1350,7 @@ ft_merge_child(
{
// this function should not be called
// if the child is not mergable
- assert(node->n_children > 1);
+ paranoid_invariant(node->n_children > 1);
toku_assert_entire_node_in_memory(node);
int childnuma,childnumb;
@@ -1323,11 +1361,11 @@ ft_merge_child(
childnuma = childnum_to_merge;
childnumb = childnum_to_merge+1;
}
- assert(0 <= childnuma);
- assert(childnuma+1 == childnumb);
- assert(childnumb < node->n_children);
+ paranoid_invariant(0 <= childnuma);
+ paranoid_invariant(childnuma+1 == childnumb);
+ paranoid_invariant(childnumb < node->n_children);
- assert(node->height>0);
+ paranoid_invariant(node->height>0);
// We suspect that at least one of the children is fusible, but they might not be.
// for test
@@ -1371,22 +1409,27 @@ ft_merge_child(
maybe_merge_pinned_nodes(node, &node->childkeys[childnuma], childa, childb, &did_merge, &did_rebalance, &splitk, h->h->nodesize);
if (childa->height>0) {
for (int i=0; i+1<childa->n_children; i++) {
- assert(childa->childkeys[i].data);
+ paranoid_invariant(childa->childkeys[i].data);
}
}
//toku_verify_estimates(t,childa);
// the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
*did_react = (bool)(did_merge || did_rebalance);
if (did_merge) {
- assert(!splitk.data);
+ paranoid_invariant(!splitk.data);
} else {
- assert(splitk.data);
+ paranoid_invariant(splitk.data);
}
node->totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.
if (did_merge) {
- destroy_nonleaf_childinfo(BNC(node, childnumb));
+ NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
+ NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
+ for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
+ remaining_bnc->flow[i] += merged_bnc->flow[i];
+ }
+ destroy_nonleaf_childinfo(merged_bnc);
set_BNULL(node, childnumb);
node->n_children--;
memmove(&node->bp[childnumb],
@@ -1397,10 +1440,14 @@ ft_merge_child(
&node->childkeys[childnuma+1],
(node->n_children-childnumb)*sizeof(node->childkeys[0]));
REALLOC_N(node->n_children-1, node->childkeys);
- assert(BP_BLOCKNUM(node, childnuma).b == childa->thisnodename.b);
+ paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->thisnodename.b);
childa->dirty = 1; // just to make sure
childb->dirty = 1; // just to make sure
} else {
+ // flow will be inaccurate for a while, oh well. the children
+ // are leaves in this case so it's not a huge deal (we're
+ // pretty far down the tree)
+
// If we didn't merge the nodes, then we need the correct pivot.
toku_copyref_dbt(&node->childkeys[childnuma], splitk);
node->totalchildkeylens += node->childkeys[childnuma].size;
@@ -1421,13 +1468,13 @@ ft_merge_child(
merge_remove_key_callback,
h
);
- assert(rrb==0);
+ assert_zero(rrb);
// for test
call_flusher_thread_callback(ft_flush_aflter_merge);
// unlock the parent
- assert(node->dirty);
+ paranoid_invariant(node->dirty);
toku_unpin_ftnode_off_client_thread(h, node);
}
else {
@@ -1435,7 +1482,7 @@ ft_merge_child(
call_flusher_thread_callback(ft_flush_aflter_rebalance);
// unlock the parent
- assert(node->dirty);
+ paranoid_invariant(node->dirty);
toku_unpin_ftnode_off_client_thread(h, node);
toku_unpin_ftnode_off_client_thread(h, childb);
}
@@ -1463,7 +1510,7 @@ flush_some_child(
{
int dirtied = 0;
NONLEAF_CHILDINFO bnc = NULL;
- assert(parent->height>0);
+ paranoid_invariant(parent->height>0);
toku_assert_entire_node_in_memory(parent);
// pick the child we want to flush to
@@ -1496,7 +1543,7 @@ flush_some_child(
// the parent before finishing reading in the entire child node.
bool may_child_be_reactive = may_node_be_reactive(child);
- assert(child->thisnodename.b!=0);
+ paranoid_invariant(child->thisnodename.b!=0);
//VERIFY_NODE(brt, child);
// only do the following work if there is a flush to perform
@@ -1508,7 +1555,9 @@ flush_some_child(
// detach buffer
BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
bnc = BNC(parent, childnum);
- set_BNC(parent, childnum, toku_create_empty_nl());
+ NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
+ memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
+ set_BNC(parent, childnum, new_bnc);
}
//
@@ -1592,19 +1641,19 @@ flush_some_child(
// it is responsibility of `ft_split_child` to unlock nodes of
// parent and child as it sees fit
//
- assert(parent); // just make sure we have not accidentally unpinned parent
- ft_split_child(h, parent, childnum, child, fa);
+ paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
+ ft_split_child(h, parent, childnum, child, SPLIT_EVENLY, fa);
}
else if (child_re == RE_FUSIBLE) {
//
// it is responsibility of `maybe_merge_child to unlock nodes of
// parent and child as it sees fit
//
- assert(parent); // just make sure we have not accidentally unpinned parent
+ paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
fa->maybe_merge_child(fa, h, parent, childnum, child, fa->extra);
}
else {
- assert(false);
+ abort();
}
}
@@ -1657,7 +1706,7 @@ dummy_pick_heaviest_child(FT UU(h),
FTNODE UU(parent),
void* UU(extra))
{
- assert(false);
+ abort();
return -1;
}
@@ -1665,7 +1714,8 @@ void toku_ft_split_child(
FT ft,
FTNODE node,
int childnum,
- FTNODE child
+ FTNODE child,
+ enum split_mode split_mode
)
{
struct flusher_advice fa;
@@ -1684,6 +1734,34 @@ void toku_ft_split_child(
node,
childnum, // childnum to split
child,
+ split_mode,
+ &fa
+ );
+}
+
+void toku_ft_merge_child(
+ FT ft,
+ FTNODE node,
+ int childnum
+ )
+{
+ struct flusher_advice fa;
+ flusher_advice_init(
+ &fa,
+ dummy_pick_heaviest_child,
+ dont_destroy_basement_nodes,
+ never_recursively_flush,
+ default_merge_child,
+ dummy_update_status,
+ default_pick_child_after_split,
+ NULL
+ );
+ bool did_react;
+ ft_merge_child(
+ ft,
+ node,
+ childnum, // childnum to merge
+ &did_react,
&fa
);
}
@@ -1816,13 +1894,13 @@ flush_node_on_background_thread(FT h, FTNODE parent)
// and pick the child we want to flush to
//
int childnum = find_heaviest_child(parent);
- assert(toku_bnc_n_entries(BNC(parent, childnum))>0);
+ paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
//
// see if we can pin the child
//
FTNODE child;
uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum);
- int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, &child);
+ int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
if (r != 0) {
// In this case, we could not lock the child, so just place the parent on the background thread
// In the callback, we will use flush_some_child, which checks to
@@ -1846,7 +1924,9 @@ flush_node_on_background_thread(FT h, FTNODE parent)
parent->dirty = 1;
BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
- set_BNC(parent, childnum, toku_create_empty_nl());
+ NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
+ memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
+ set_BNC(parent, childnum, new_bnc);
//
// at this point, the buffer has been detached from the parent
diff --git a/ft/ft-flusher.h b/ft/ft-flusher.h
index 36370d65a64..7f0a71ed24a 100644
--- a/ft/ft-flusher.h
+++ b/ft/ft-flusher.h
@@ -89,6 +89,7 @@ ftleaf_split(
FTNODE *nodeb,
DBT *splitk,
bool create_new_node,
+ enum split_mode split_mode,
uint32_t num_dependent_nodes,
FTNODE* dependent_nodes
);
diff --git a/ft/ft-hot-flusher.cc b/ft/ft-hot-flusher.cc
index a3fa072d0e0..fbc45b9eac2 100644
--- a/ft/ft-hot-flusher.cc
+++ b/ft/ft-hot-flusher.cc
@@ -9,6 +9,7 @@
#include <ft-cachetable-wrappers.h>
#include <ft-internal.h>
#include <ft.h>
+#include <portability/toku_atomic.h>
// Member Descirption:
// 1. highest_pivot_key - this is the key that corresponds to the
@@ -251,7 +252,7 @@ toku_ft_hot_optimize(FT_HANDLE brt,
uint64_t loop_count = 0;
MSN msn_at_start_of_hot = ZERO_MSN; // capture msn from root at
// start of HOT operation
- (void) __sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_STARTED), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_STARTED), 1);
{
toku_ft_note_hot_begin(brt);
@@ -353,9 +354,9 @@ toku_ft_hot_optimize(FT_HANDLE brt,
}
if (success) {
- (void) __sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_COMPLETED), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_COMPLETED), 1);
} else {
- (void) __sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_ABORTED), 1);
+ (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_ABORTED), 1);
}
}
return r;
diff --git a/ft/ft-internal.h b/ft/ft-internal.h
index e72f2bcab51..269c1c8ba6e 100644
--- a/ft/ft-internal.h
+++ b/ft/ft-internal.h
@@ -124,6 +124,7 @@ struct ftnode_nonleaf_childinfo {
off_omt_t broadcast_list;
marked_off_omt_t fresh_message_tree;
off_omt_t stale_message_tree;
+ uint64_t flow[2]; // current and last checkpoint
};
unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc);
@@ -133,6 +134,7 @@ long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc);
void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp);
void toku_bnc_empty(NONLEAF_CHILDINFO bnc);
void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child);
+bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) __attribute__((const, nonnull));
bool toku_ft_nonleaf_is_gorged(FTNODE node, uint32_t nodesize);
@@ -152,13 +154,13 @@ struct ftnode_leaf_basement_node {
STAT64INFO_S stat64_delta; // change in stat64 counters since basement was last written to disk
};
-enum __attribute__((__packed__)) pt_state { // declare this to be packed so that when used below it will only take 1 byte.
+enum pt_state { // declare this to be packed so that when used below it will only take 1 byte.
PT_INVALID = 0,
PT_ON_DISK = 1,
PT_COMPRESSED = 2,
PT_AVAIL = 3};
-enum __attribute__((__packed__)) ftnode_child_tag {
+enum ftnode_child_tag {
BCT_INVALID = 0,
BCT_NULL,
BCT_SUBBLOCK,
@@ -166,7 +168,7 @@ enum __attribute__((__packed__)) ftnode_child_tag {
BCT_NONLEAF
};
-typedef struct __attribute__((__packed__)) ftnode_child_pointer {
+typedef struct ftnode_child_pointer {
union {
struct sub_block *subblock;
struct ftnode_nonleaf_childinfo *nonleaf;
@@ -264,7 +266,13 @@ struct ftnode {
// that have a read lock on an internal node may try to touch the clock
// simultaneously
//
-#define BP_TOUCH_CLOCK(node, i) ((void) __sync_val_compare_and_swap(&(node)->bp[i].clock_count, 0, 1))
+#define BP_TOUCH_CLOCK(node, i) do { \
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&(node)->bp[i].clock_count, sizeof (node)->bp[i].clock_count); \
+ TOKU_DRD_IGNORE_VAR((node)->bp[i].clock_count); \
+ (node)->bp[i].clock_count = 1; \
+ TOKU_DRD_STOP_IGNORING_VAR((node)->bp[i].clock_count); \
+ TOKU_VALGRIND_HG_ENABLE_CHECKING(&(node)->bp[i].clock_count, sizeof (node)->bp[i].clock_count); \
+ } while (0)
#define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
#define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0)
// not crazy about having these two here, one is for the case where we create new
@@ -275,47 +283,54 @@ struct ftnode {
// internal node macros
static inline void set_BNULL(FTNODE node, int i) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
node->bp[i].ptr.tag = BCT_NULL;
}
static inline bool is_BNULL (FTNODE node, int i) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
return node->bp[i].ptr.tag == BCT_NULL;
}
static inline NONLEAF_CHILDINFO BNC(FTNODE node, int i) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
FTNODE_CHILD_POINTER p = node->bp[i].ptr;
- assert(p.tag==BCT_NONLEAF);
+ paranoid_invariant(p.tag==BCT_NONLEAF);
return p.u.nonleaf;
}
static inline void set_BNC(FTNODE node, int i, NONLEAF_CHILDINFO nl) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
FTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_NONLEAF;
p->u.nonleaf = nl;
}
static inline BASEMENTNODE BLB(FTNODE node, int i) {
- assert(i<node->n_children);
- assert(0<=i);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
FTNODE_CHILD_POINTER p = node->bp[i].ptr;
- assert(p.tag==BCT_LEAF);
+ paranoid_invariant(p.tag==BCT_LEAF);
return p.u.leaf;
}
static inline void set_BLB(FTNODE node, int i, BASEMENTNODE bn) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
FTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_LEAF;
p->u.leaf = bn;
}
static inline SUB_BLOCK BSB(FTNODE node, int i) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
FTNODE_CHILD_POINTER p = node->bp[i].ptr;
- assert(p.tag==BCT_SUBBLOCK);
+ paranoid_invariant(p.tag==BCT_SUBBLOCK);
return p.u.subblock;
}
static inline void set_BSB(FTNODE node, int i, SUB_BLOCK sb) {
- assert(0<=i && i<node->n_children);
+ paranoid_invariant(0<=i);
+ paranoid_invariant(i<node->n_children);
FTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_SUBBLOCK;
p->u.subblock = sb;
@@ -390,6 +405,9 @@ struct ft_header {
// This is decremented from our currnt MIN_MSN so as not to clash
// with any existing 'normal' MSN's.
MSN highest_unused_msn_for_upgrade;
+ // Largest MSN ever injected into the tree. Used to set the MSN for
+ // messages as they get injected.
+ MSN max_msn_in_ft;
// last time that a hot optimize operation was begun
uint64_t time_of_last_optimize_begin;
@@ -605,13 +623,6 @@ void toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc
STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode);
-
-#if 1
-#define DEADBEEF ((void*)0xDEADBEEF)
-#else
-#define DEADBEEF ((void*)0xDEADBEEFDEADBEEF)
-#endif
-
//#define SLOW
#ifdef SLOW
#define VERIFY_NODE(t,n) (toku_verify_or_set_counts(n), toku_verify_estimates(t,n))
@@ -629,6 +640,7 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode);
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h);
void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe);
extern void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
+extern void toku_ftnode_checkpoint_complete_callback(void *value_data);
extern void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
extern int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs);
extern void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
@@ -643,7 +655,15 @@ void toku_ft_split_child(
FT h,
FTNODE node,
int childnum,
- FTNODE child
+ FTNODE child,
+ enum split_mode split_mode
+ );
+// Given pinned node, merge childnum with a neighbor and update node with
+// information about the change
+void toku_ft_merge_child(
+ FT ft,
+ FTNODE node,
+ int childnum
);
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT h) {
CACHETABLE_WRITE_CALLBACK wc;
@@ -652,6 +672,7 @@ static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT h) {
wc.pe_callback = toku_ftnode_pe_callback;
wc.cleaner_callback = toku_ftnode_cleaner_callback;
wc.clone_callback = toku_ftnode_clone_callback;
+ wc.checkpoint_complete_callback = toku_ftnode_checkpoint_complete_callback;
wc.write_extraargs = h;
return wc;
}
@@ -720,7 +741,7 @@ static inline void fill_bfe_for_subset_read(
bool disable_prefetching
)
{
- invariant(h->h->type == FT_CURRENT);
+ paranoid_invariant(h->h->type == FT_CURRENT);
bfe->type = ftnode_fetch_subset;
bfe->h = h;
bfe->search = search;
@@ -739,7 +760,7 @@ static inline void fill_bfe_for_subset_read(
// Currently used for stat64.
//
static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) {
- invariant(h->h->type == FT_CURRENT);
+ paranoid_invariant(h->h->type == FT_CURRENT);
bfe->type = ftnode_fetch_none;
bfe->h = h;
bfe->search = NULL;
@@ -752,7 +773,7 @@ static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) {
}
static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) {
- assert(bfe->type == ftnode_fetch_prefetch);
+ paranoid_invariant(bfe->type == ftnode_fetch_prefetch);
if (bfe->range_lock_left_key != NULL) {
toku_free(bfe->range_lock_left_key->data);
toku_destroy_dbt(bfe->range_lock_left_key);
@@ -771,7 +792,7 @@ static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) {
static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe,
FT h,
FT_CURSOR c) {
- invariant(h->h->type == FT_CURRENT);
+ paranoid_invariant(h->h->type == FT_CURRENT);
bfe->type = ftnode_fetch_prefetch;
bfe->h = h;
bfe->search = NULL;
@@ -779,13 +800,13 @@ static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe,
const DBT *left = &c->range_lock_left_key;
const DBT *right = &c->range_lock_right_key;
if (left->data) {
- MALLOC(bfe->range_lock_left_key); resource_assert(bfe->range_lock_left_key);
+ XMALLOC(bfe->range_lock_left_key);
toku_fill_dbt(bfe->range_lock_left_key, toku_xmemdup(left->data, left->size), left->size);
} else {
bfe->range_lock_left_key = NULL;
}
if (right->data) {
- MALLOC(bfe->range_lock_right_key); resource_assert(bfe->range_lock_right_key);
+ XMALLOC(bfe->range_lock_right_key);
toku_fill_dbt(bfe->range_lock_right_key, toku_xmemdup(right->data, right->size), right->size);
} else {
bfe->range_lock_right_key = NULL;
@@ -815,6 +836,9 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto
__attribute__((nonnull))
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied);
+__attribute__((const,nonnull))
+size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd);
+
int
toku_ft_search_which_child(
DESCRIPTOR desc,
@@ -840,8 +864,8 @@ void toku_create_new_ftnode (FT_HANDLE t, FTNODE *result, int height, int n_chil
void toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_children,
int layout_version, unsigned int flags);
-unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k,
- DESCRIPTOR desc, ft_compare_func cmp)
+int toku_ftnode_which_child(FTNODE node, const DBT *k,
+ DESCRIPTOR desc, ft_compare_func cmp)
__attribute__((__warn_unused_result__));
/**
@@ -854,10 +878,10 @@ unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k,
* If k is equal to some pivot, then we return the next (to the right)
* childnum.
*/
-unsigned int toku_ftnode_hot_next_child(FTNODE node,
- const DBT *k,
- DESCRIPTOR desc,
- ft_compare_func cmp);
+int toku_ftnode_hot_next_child(FTNODE node,
+ const DBT *k,
+ DESCRIPTOR desc,
+ ft_compare_func cmp);
/* Stuff for testing */
// toku_testsetup_initialize() must be called before any other test_setup_xxx() functions are called.
@@ -882,7 +906,7 @@ int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra)
// toku_ft_root_put_cmd() accepts non-constant cmd because this is where we set the msn
void toku_ft_root_put_cmd(FT h, FT_MSG_S * cmd);
-void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free);
+void *mempool_malloc_from_omt(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free);
// Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items
// from the OMT (which items refer to items in the old mempool) into the new mempool.
// If MAYBE_FREE is NULL then free the old mempool's space.
@@ -896,7 +920,7 @@ toku_get_node_for_verify(
int
toku_verify_ftnode (FT_HANDLE brt,
- MSN rootmsn, MSN parentmsn,
+ MSN rootmsn, MSN parentmsn, bool messages_exist_above,
FTNODE node, int height,
const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
@@ -978,6 +1002,19 @@ typedef enum {
FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_NUM_MSG_BUFFER_FETCHED_PREFETCH,
FT_NUM_MSG_BUFFER_FETCHED_WRITE,
+ FT_PRO_NUM_ROOT_SPLIT,
+ FT_PRO_NUM_ROOT_H0_INJECT,
+ FT_PRO_NUM_ROOT_H1_INJECT,
+ FT_PRO_NUM_INJECT_DEPTH_0,
+ FT_PRO_NUM_INJECT_DEPTH_1,
+ FT_PRO_NUM_INJECT_DEPTH_2,
+ FT_PRO_NUM_INJECT_DEPTH_3,
+ FT_PRO_NUM_INJECT_DEPTH_GT3,
+ FT_PRO_NUM_STOP_NONEMPTY_BUF,
+ FT_PRO_NUM_STOP_H1,
+ FT_PRO_NUM_STOP_LOCK_CHILD,
+ FT_PRO_NUM_STOP_CHILD_INMEM,
+ FT_PRO_NUM_DIDNT_WANT_PROMOTE,
FT_STATUS_NUM_ROWS
} ft_status_entry;
@@ -1015,6 +1052,7 @@ toku_ft_leaf_apply_cmd (
ft_update_func update_fun,
DESCRIPTOR desc,
FTNODE node,
+ int target_childnum,
FT_MSG cmd,
uint64_t *workdone,
STAT64INFO stats_to_update
@@ -1025,14 +1063,17 @@ toku_ft_node_put_cmd (
ft_compare_func compare_fun,
ft_update_func update_fun,
DESCRIPTOR desc,
- FTNODE node,
- FT_MSG cmd,
+ FTNODE node,
+ int target_childnum,
+ FT_MSG cmd,
bool is_fresh,
+ size_t flow_deltas[],
STAT64INFO stats_to_update
);
void toku_flusher_thread_set_callback(void (*callback_f)(int, void*), void* extra);
-int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h);
+int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) __attribute__((nonnull));
+int toku_upgrade_msn_from_root_to_header(int fd, FT h) __attribute__((nonnull));
#endif
diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc
index a92318e6b13..db11b8ab443 100644
--- a/ft/ft-ops.cc
+++ b/ft/ft-ops.cc
@@ -129,6 +129,7 @@ basement nodes, bulk fetch, and partial fetch:
#include "xids.h"
#include <toku_race_tools.h>
+#include <portability/toku_atomic.h>
#include <util/mempool.h>
#include <util/partitioned_counter.h>
#include <util/rwlock.h>
@@ -214,6 +215,20 @@ status_init(void)
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, PARCOUNT, "buffers fetched for prefetch");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write");
+ STATUS_INIT(FT_PRO_NUM_ROOT_SPLIT, PARCOUNT, "promotion: roots split");
+ STATUS_INIT(FT_PRO_NUM_ROOT_H0_INJECT, PARCOUNT, "promotion: leaf roots injected into");
+ STATUS_INIT(FT_PRO_NUM_ROOT_H1_INJECT, PARCOUNT, "promotion: h1 roots injected into");
+ STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_0, PARCOUNT, "promotion: injections at depth 0");
+ STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_1, PARCOUNT, "promotion: injections at depth 1");
+ STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_2, PARCOUNT, "promotion: injections at depth 2");
+ STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_3, PARCOUNT, "promotion: injections at depth 3");
+ STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_GT3, PARCOUNT, "promotion: injections lower than depth 3");
+ STATUS_INIT(FT_PRO_NUM_STOP_NONEMPTY_BUF, PARCOUNT, "promotion: stopped because of a nonempty buffer");
+ STATUS_INIT(FT_PRO_NUM_STOP_H1, PARCOUNT, "promotion: stopped at height 1");
+ STATUS_INIT(FT_PRO_NUM_STOP_LOCK_CHILD, PARCOUNT, "promotion: stopped because the child was locked or not at all in memory");
+ STATUS_INIT(FT_PRO_NUM_STOP_CHILD_INMEM, PARCOUNT, "promotion: stopped because the child was not fully in memory");
+ STATUS_INIT(FT_PRO_NUM_DIDNT_WANT_PROMOTE, PARCOUNT, "promotion: stopped anyway, after locking the child");
+
ft_status.initialized = true;
}
static void status_destroy(void) {
@@ -242,8 +257,8 @@ bool is_entire_node_in_memory(FTNODE node) {
}
void
-toku_assert_entire_node_in_memory(FTNODE node) {
- assert(is_entire_node_in_memory(node));
+toku_assert_entire_node_in_memory(FTNODE UU() node) {
+ paranoid_invariant(is_entire_node_in_memory(node));
}
static uint32_t
@@ -261,7 +276,7 @@ static enum reactivity
get_leaf_reactivity (FTNODE node, uint32_t nodesize) {
enum reactivity re = RE_STABLE;
toku_assert_entire_node_in_memory(node);
- assert(node->height==0);
+ paranoid_invariant(node->height==0);
unsigned int size = toku_serialize_ftnode_size(node);
if (size > nodesize && get_leaf_num_entries(node) > 1) {
re = RE_FISSIBLE;
@@ -274,7 +289,7 @@ get_leaf_reactivity (FTNODE node, uint32_t nodesize) {
enum reactivity
get_nonleaf_reactivity (FTNODE node) {
- assert(node->height>0);
+ paranoid_invariant(node->height>0);
int n_children = node->n_children;
if (n_children > TREE_FANOUT) return RE_FISSIBLE;
if (n_children*4 < TREE_FANOUT) return RE_FUSIBLE;
@@ -310,7 +325,7 @@ toku_ft_nonleaf_is_gorged (FTNODE node, uint32_t nodesize) {
// is greater than nodesize (which as of Maxwell should be
// 4MB)
//
- assert(node->height > 0);
+ paranoid_invariant(node->height > 0);
for (int child = 0; child < node->n_children; ++child) {
size += BP_WORKDONE(node, child);
}
@@ -325,14 +340,15 @@ toku_ft_nonleaf_is_gorged (FTNODE node, uint32_t nodesize) {
(!buffers_are_empty));
}
-static void ft_verify_flags(FT ft, FTNODE node) {
- assert(ft->h->flags == node->flags);
+static void ft_verify_flags(FT UU(ft), FTNODE UU(node)) {
+ paranoid_invariant(ft->h->flags == node->flags);
}
int toku_ft_debug_mode = 0;
uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) {
- assert(node->height>0 && childnum<node->n_children);
+ paranoid_invariant(node->height>0);
+ paranoid_invariant(childnum<node->n_children);
return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum));
}
@@ -389,7 +405,7 @@ toku_bnc_memory_used(NONLEAF_CHILDINFO bnc)
static long
get_avail_internal_node_partition_size(FTNODE node, int i)
{
- assert(node->height > 0);
+ paranoid_invariant(node->height > 0);
return toku_bnc_memory_size(BNC(node, i));
}
@@ -418,7 +434,7 @@ ftnode_cachepressure_size(FTNODE node)
retval += BP_WORKDONE(node, i);
}
else {
- assert(false);
+ abort();
}
}
}
@@ -468,7 +484,7 @@ ftnode_memory_size (FTNODE node)
}
}
else {
- assert(false);
+ abort();
}
}
return retval;
@@ -505,7 +521,7 @@ PAIR_ATTR make_invalid_pair_attr(void) {
static uint64_t dict_id_serial = 1;
static DICTIONARY_ID
next_dict_id(void) {
- uint64_t i = __sync_fetch_and_add(&dict_id_serial, 1);
+ uint64_t i = toku_sync_fetch_and_add(&dict_id_serial, 1);
assert(i); // guarantee unique dictionary id by asserting 64-bit counter never wraps
DICTIONARY_ID d = {.dictid = i};
return d;
@@ -532,7 +548,7 @@ toku_bfe_wants_child_available (struct ftnode_fetch_extra* bfe, int childnum)
int
toku_bfe_leftmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node)
{
- lazy_assert(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch);
+ paranoid_invariant(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch);
if (bfe->left_is_neg_infty) {
return 0;
} else if (bfe->range_lock_left_key == NULL) {
@@ -545,7 +561,7 @@ toku_bfe_leftmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node)
int
toku_bfe_rightmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node)
{
- lazy_assert(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch);
+ paranoid_invariant(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch);
if (bfe->right_is_pos_infty) {
return node->n_children - 1;
} else if (bfe->range_lock_right_key == NULL) {
@@ -627,7 +643,7 @@ static void ftnode_update_disk_stats(
static void ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
for (int i = 0; i < node->n_children; i++) {
BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
- assert(BP_STATE(node,i) == PT_AVAIL);
+ paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
BP_STATE(cloned_node,i) = PT_AVAIL;
BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
if (node->height == 0) {
@@ -639,6 +655,19 @@ static void ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
}
}
+void toku_ftnode_checkpoint_complete_callback(void *value_data) {
+ FTNODE node = static_cast<FTNODE>(value_data);
+ if (node->height > 0) {
+ for (int i = 0; i < node->n_children; ++i) {
+ if (BP_STATE(node, i) == PT_AVAIL) {
+ NONLEAF_CHILDINFO bnc = BNC(node, i);
+ bnc->flow[1] = bnc->flow[0];
+ bnc->flow[0] = 0;
+ }
+ }
+ }
+}
+
void toku_ftnode_clone_callback(
void* value_data,
void** cloned_value_data,
@@ -647,12 +676,10 @@ void toku_ftnode_clone_callback(
void* write_extraargs
)
{
- FTNODE node = (FTNODE) value_data;
+ FTNODE node = static_cast<FTNODE>(value_data);
toku_assert_entire_node_in_memory(node);
- FT ft = (FT) write_extraargs;
- FTNODE XMALLOC(cloned_node);
- //FTNODE cloned_node = (FTNODE)toku_xmalloc(sizeof(*FTNODE));
- memset(cloned_node, 0, sizeof(*cloned_node));
+ FT ft = static_cast<FT>(write_extraargs);
+ FTNODE XCALLOC(cloned_node);
if (node->height == 0) {
// set header stats, must be done before rebalancing
ftnode_update_disk_stats(node, ft, for_checkpoint);
@@ -780,7 +807,7 @@ int toku_ftnode_fetch_callback (CACHEFILE UU(cachefile), PAIR p, int fd, BLOCKNU
fprintf(stderr, "Error deserializing node, errno = %d", r);
}
// make absolutely sure we crash before doing anything else.
- assert(false);
+ abort();
}
if (r == 0) {
@@ -799,9 +826,9 @@ void toku_ftnode_pe_est_callback(
void* UU(write_extraargs)
)
{
- assert(ftnode_pv != NULL);
+ paranoid_invariant(ftnode_pv != NULL);
long bytes_to_free = 0;
- FTNODE node = (FTNODE)ftnode_pv;
+ FTNODE node = static_cast<FTNODE>(ftnode_pv);
if (node->dirty || node->height == 0 ||
node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
*bytes_freed_estimate = 0;
@@ -931,7 +958,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
continue;
}
else {
- assert(false);
+ abort();
}
}
}
@@ -985,8 +1012,8 @@ bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) {
// we can possibly require is a single basement node
// we find out what basement node the query cares about
// and check if it is available
- assert(bfe->h->compare_fun);
- assert(bfe->search);
+ paranoid_invariant(bfe->h->compare_fun);
+ paranoid_invariant(bfe->search);
bfe->child_to_read = toku_ft_search_which_child(
&bfe->h->cmp_descriptor,
bfe->h->compare_fun,
@@ -1000,7 +1027,7 @@ bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) {
else if (bfe->type == ftnode_fetch_prefetch) {
// makes no sense to have prefetching disabled
// and still call this function
- assert(!bfe->disable_prefetching);
+ paranoid_invariant(!bfe->disable_prefetching);
int lc = toku_bfe_leftmost_child_wanted(bfe, node);
int rc = toku_bfe_rightmost_child_wanted(bfe, node);
for (int i = lc; i <= rc; ++i) {
@@ -1011,7 +1038,7 @@ bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) {
}
else {
// we have a bug. The type should be known
- assert(false);
+ abort();
}
return retval;
}
@@ -1118,7 +1145,7 @@ int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraar
r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
}
else {
- assert(false);
+ abort();
}
}
@@ -1132,7 +1159,7 @@ int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraar
"Error while reading node partition %d\n",
get_maybe_error_errno());
}
- assert(false);
+ abort();
}
}
@@ -1197,7 +1224,7 @@ void toku_destroy_ftnode_internals(FTNODE node)
toku_free(sb->compressed_ptr);
toku_free(sb);
} else {
- assert(is_BNULL(node, i));
+ paranoid_invariant(is_BNULL(node, i));
}
set_BNULL(node, i);
}
@@ -1233,8 +1260,8 @@ void
toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_children, int layout_version, unsigned int flags)
// Effect: Fill in N as an empty ftnode.
{
- assert(layout_version != 0);
- assert(height >= 0);
+ paranoid_invariant(layout_version != 0);
+ paranoid_invariant(height >= 0);
if (height == 0)
STATUS_INC(FT_CREATE_LEAF, 1);
@@ -1330,7 +1357,8 @@ ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp)
ft,
newroot,
0, // childnum to split
- oldroot
+ oldroot,
+ SPLIT_EVENLY
);
// ft_split_child released locks on newroot
@@ -1393,7 +1421,7 @@ ft_leaf_delete_leafentry (
{
int r = toku_omt_delete_at(bn->buffer, idx);
- assert(r==0);
+ assert_zero(r);
}
bn->n_bytes_in_buffer -= leafentry_disksize(le);
@@ -1428,11 +1456,13 @@ toku_ft_bn_apply_cmd_once (
// That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is
// no longer in use. We'll have to release the old mempool later.
{
- int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
+ int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
invariant(r==0);
}
- if (new_le) assert(newsize == leafentry_disksize(new_le));
+ if (new_le) {
+ paranoid_invariant(newsize == leafentry_disksize(new_le));
+ }
if (le && new_le) {
bn->n_bytes_in_buffer -= oldsize;
bn->n_bytes_in_buffer += newsize;
@@ -1468,10 +1498,7 @@ toku_ft_bn_apply_cmd_once (
}
}
if (workdone) { // test programs may call with NULL
- //uint64_t new_workdone =
- (void) __sync_add_and_fetch(workdone, workdone_this_le);
- //if (new_workdone > STATUS_VALUE(FT_MAX_WORKDONE))
- // STATUS_VALUE(FT_MAX_WORKDONE) = new_workdone;
+ *workdone += workdone_this_le;
}
// if we created a new mempool buffer, free the old one
@@ -1512,8 +1539,8 @@ struct setval_extra_s {
*/
static void setval_fun (const DBT *new_val, void *svextra_v) {
struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
- assert(svextra->tag==setval_tag);
- assert(!svextra->did_set_val);
+ paranoid_invariant(svextra->tag==setval_tag);
+ paranoid_invariant(!svextra->did_set_val);
svextra->did_set_val = true;
{
@@ -1531,8 +1558,8 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
msg.u.id.val = &val;
}
toku_ft_bn_apply_cmd_once(svextra->bn, &msg,
- svextra->idx, svextra->le,
- svextra->workdone, svextra->stats_to_update);
+ svextra->idx, svextra->le,
+ svextra->workdone, svextra->stats_to_update);
svextra->setval_r = 0;
}
}
@@ -1563,14 +1590,14 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
} else if (cmd->type == FT_UPDATE_BROADCAST_ALL) {
// key is not passed in with broadcast, it comes from le
// update function extra is passed in with command
- assert(le); // for broadcast updates, we just hit all leafentries
+ paranoid_invariant(le); // for broadcast updates, we just hit all leafentries
// so this cannot be null
- assert(cmd->u.id.key->size == 0);
+ paranoid_invariant(cmd->u.id.key->size == 0);
STATUS_INC(FT_UPDATES_BROADCAST, 1);
keyp = toku_fill_dbt(&key, le_key(le), le_keylen(le));
update_function_extra = cmd->u.id.val;
} else {
- assert(false);
+ abort();
}
if (le && !le_latest_is_del(le)) {
@@ -1647,7 +1674,7 @@ toku_ft_bn_apply_cmd (
if (r==DB_NOTFOUND) {
storeddata = 0;
} else {
- assert(r==0);
+ assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav);
}
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
@@ -1676,7 +1703,7 @@ toku_ft_bn_apply_cmd (
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx);
if (r == DB_NOTFOUND) break;
- assert(r==0);
+ assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav);
while (1) {
@@ -1692,11 +1719,11 @@ toku_ft_bn_apply_cmd (
//the omt than we started with and the next leafentry will be at the
//same index as the deleted one. Otherwise, the next leafentry will
//be at the next index (+1).
- assert(num_leafentries_before == num_leafentries_after ||
- num_leafentries_before-1 == num_leafentries_after);
+ paranoid_invariant(num_leafentries_before == num_leafentries_after ||
+ num_leafentries_before-1 == num_leafentries_after);
if (num_leafentries_after==num_leafentries_before) idx++; //Not deleted, advance index.
- assert(idx <= num_leafentries_after);
+ paranoid_invariant(idx <= num_leafentries_after);
if (idx == num_leafentries_after) break; //Reached the end of the leaf
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r);
@@ -1731,7 +1758,7 @@ toku_ft_bn_apply_cmd (
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) {
- assert(new_omt_size+1 == omt_size);
+ paranoid_invariant(new_omt_size+1 == omt_size);
//Item was deleted.
deleted = 1;
}
@@ -1741,7 +1768,7 @@ toku_ft_bn_apply_cmd (
else
idx++;
}
- assert(toku_omt_size(bn->buffer) == omt_size);
+ paranoid_invariant(toku_omt_size(bn->buffer) == omt_size);
break;
case FT_COMMIT_BROADCAST_TXN:
@@ -1757,7 +1784,7 @@ toku_ft_bn_apply_cmd (
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) {
- assert(new_omt_size+1 == omt_size);
+ paranoid_invariant(new_omt_size+1 == omt_size);
//Item was deleted.
deleted = 1;
}
@@ -1767,7 +1794,7 @@ toku_ft_bn_apply_cmd (
else
idx++;
}
- assert(toku_omt_size(bn->buffer) == omt_size);
+ paranoid_invariant(toku_omt_size(bn->buffer) == omt_size);
break;
case FT_UPDATE: {
@@ -1788,12 +1815,10 @@ toku_ft_bn_apply_cmd (
uint32_t num_leafentries_before;
while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
- assert(r==0);
+ assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, workdone, stats_to_update);
- // TODO(leif): This early return means get_leaf_reactivity()
- // and VERIFY_NODE() never get called. Is this a problem?
- assert(r==0);
+ assert_zero(r);
if (num_leafentries_before == toku_omt_size(bn->buffer)) {
// we didn't delete something, so increment the index.
@@ -1880,24 +1905,30 @@ void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen,
// append a cmd to a nonleaf node's child buffer
// should be static, but used by test programs
void toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
- assert(BP_STATE(node,childnum) == PT_AVAIL);
+ paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
toku_bnc_insert_msg(BNC(node, childnum), key->data, key->size, val->data, val->size, type, msn, xids, is_fresh, desc, compare_fun);
node->dirty = 1;
}
-static void ft_nonleaf_cmd_once_to_child (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, unsigned int childnum, FT_MSG cmd, bool is_fresh)
+static void ft_nonleaf_cmd_once_to_child(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int target_childnum, FT_MSG cmd, bool is_fresh, size_t flow_deltas[])
// Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here.
// Also we don't worry about the node getting overfull here. It's the caller's problem.
{
+ unsigned int childnum = (target_childnum >= 0
+ ? target_childnum
+ : toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun));
toku_ft_append_to_child_buffer(compare_fun, desc, node, childnum, cmd->type, cmd->msn, cmd->xids, is_fresh, cmd->u.id.key, cmd->u.id.val);
+ NONLEAF_CHILDINFO bnc = BNC(node, childnum);
+ bnc->flow[0] += flow_deltas[0];
+ bnc->flow[1] += flow_deltas[1];
}
/* Find the leftmost child that may contain the key.
* If the key exists it will be in the child whose number
* is the return value of this function.
*/
-unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k,
- DESCRIPTOR desc, ft_compare_func cmp) {
+int toku_ftnode_which_child(FTNODE node, const DBT *k,
+ DESCRIPTOR desc, ft_compare_func cmp) {
#define DO_PIVOT_SEARCH_LR 0
#if DO_PIVOT_SEARCH_LR
int i;
@@ -1953,11 +1984,11 @@ unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k,
}
// Used for HOT.
-unsigned int
+int
toku_ftnode_hot_next_child(FTNODE node,
- const DBT *k,
- DESCRIPTOR desc,
- ft_compare_func cmp) {
+ const DBT *k,
+ DESCRIPTOR desc,
+ ft_compare_func cmp) {
int low = 0;
int hi = node->n_children - 1;
int mi;
@@ -1990,27 +2021,14 @@ ft_msg_size(FT_MSG msg) {
}
-static void ft_nonleaf_cmd_once(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh)
-// Effect: Insert a message into a nonleaf. We may put it into a child, possibly causing the child to become reactive.
-// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
-// The re_array[i] gets set to reactivity of any modified child.
-{
- /* find the right subtree */
- //TODO: accesses key, val directly
- unsigned int childnum = toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun);
-
- ft_nonleaf_cmd_once_to_child (compare_fun, desc, node, childnum, cmd, is_fresh);
-}
-
static void
-ft_nonleaf_cmd_all (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh)
+ft_nonleaf_cmd_all (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh, size_t flow_deltas[])
// Effect: Put the cmd into a nonleaf node. We put it into all children, possibly causing the children to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
{
- int i;
- for (i = 0; i < node->n_children; i++) {
- ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, i, cmd, is_fresh);
+ for (int i = 0; i < node->n_children; i++) {
+ ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, i, cmd, is_fresh, flow_deltas);
}
}
@@ -2033,7 +2051,7 @@ ft_msg_does_nothing(FT_MSG cmd)
}
static void
-ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh)
+ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int target_childnum, FT_MSG cmd, bool is_fresh, size_t flow_deltas[])
// Effect: Put the cmd into a nonleaf node. We may put it into a child, possibly causing the child to become reactive.
// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
// The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
@@ -2050,52 +2068,15 @@ ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, F
invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
node->max_msn_applied_to_node_on_disk = cmd_msn;
- //TODO: Accessing type directly
- switch (cmd->type) {
- case FT_INSERT_NO_OVERWRITE:
- case FT_INSERT:
- case FT_DELETE_ANY:
- case FT_ABORT_ANY:
- case FT_COMMIT_ANY:
- case FT_UPDATE:
- ft_nonleaf_cmd_once(compare_fun, desc, node, cmd, is_fresh);
- return;
- case FT_COMMIT_BROADCAST_ALL:
- case FT_COMMIT_BROADCAST_TXN:
- case FT_ABORT_BROADCAST_TXN:
- case FT_OPTIMIZE:
- case FT_OPTIMIZE_FOR_UPGRADE:
- case FT_UPDATE_BROADCAST_ALL:
- ft_nonleaf_cmd_all (compare_fun, desc, node, cmd, is_fresh); // send message to all children
- return;
- case FT_NONE:
- return;
- }
- abort(); // cannot happen
-}
-
-
-// return true if root changed, false otherwise
-static void
-ft_process_maybe_reactive_root (FT ft, FTNODE *nodep) {
- FTNODE node = *nodep;
- toku_assert_entire_node_in_memory(node);
- enum reactivity re = get_node_reactivity(node, ft->h->nodesize);
- switch (re) {
- case RE_STABLE:
- return;
- case RE_FISSIBLE:
- {
- ft_init_new_root(ft, node, nodep);
- return;
- }
- case RE_FUSIBLE:
- return; // Cannot merge anything at the root, so return happy.
+ if (ft_msg_applies_once(cmd)) {
+ ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas);
+ } else if (ft_msg_applies_all(cmd)) {
+ ft_nonleaf_cmd_all(compare_fun, desc, node, cmd, is_fresh, flow_deltas);
+ } else {
+ paranoid_invariant(ft_msg_applies_none(cmd));
}
- abort(); // cannot happen
}
-
// Garbage collect one leaf entry.
static void
ft_basement_node_gc_once(BASEMENTNODE bn,
@@ -2106,7 +2087,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
const xid_omt_t &live_root_txns,
STAT64INFO_S * delta)
{
- assert(leaf_entry);
+ paranoid_invariant(leaf_entry);
// There is no point in running GC if there is only one committed
// leaf entry.
@@ -2132,7 +2113,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
garbage_collect_leafentry(leaf_entry,
&new_leaf_entry,
&newsize,
- bn->buffer,
+ &bn->buffer,
&bn->buffer_mempool,
&maybe_free,
snapshot_xids,
@@ -2191,7 +2172,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
OMTVALUE storedatav = NULL;
LEAFENTRY leaf_entry;
r = toku_omt_fetch(bn->buffer, index, &storedatav);
- assert(r == 0);
+ assert_zero(r);
CAST_FROM_VOIDP(leaf_entry, storedatav);
ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, delta);
// Check if the leaf entry was deleted or not.
@@ -2210,7 +2191,7 @@ ft_leaf_gc_all_les(FTNODE node,
const xid_omt_t &live_root_txns)
{
toku_assert_entire_node_in_memory(node);
- assert(node->height == 0);
+ paranoid_invariant_zero(node->height);
// Loop through each leaf entry, garbage collecting as we go.
for (int i = 0; i < node->n_children; ++i) {
// Perform the garbage collection.
@@ -2229,25 +2210,40 @@ void toku_bnc_flush_to_child(
FTNODE child
)
{
- assert(bnc);
- assert(toku_fifo_n_entries(bnc->buffer)>0);
+ paranoid_invariant(bnc);
+ paranoid_invariant(toku_fifo_n_entries(bnc->buffer)>0);
STAT64INFO_S stats_delta = {0,0};
+ size_t remaining_memsize = toku_fifo_buffer_size_in_use(bnc->buffer);
FIFO_ITERATE(
bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
({
DBT hk,hv;
FT_MSG_S ftcmd = { type, msn, xids, .u = { .id = { toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen) } } };
+ size_t flow_deltas[] = { 0, 0 };
+ if (remaining_memsize <= bnc->flow[0]) {
+ // this message is in the current checkpoint's worth of
+ // the end of the fifo
+ flow_deltas[0] = FIFO_CURRENT_ENTRY_MEMSIZE;
+ } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
+ // this message is in the last checkpoint's worth of the
+ // end of the fifo
+ flow_deltas[1] = FIFO_CURRENT_ENTRY_MEMSIZE;
+ }
toku_ft_node_put_cmd(
h->compare_fun,
h->update_fun,
&h->cmp_descriptor,
child,
+ -1,
&ftcmd,
is_fresh,
+ flow_deltas,
&stats_delta
);
+ remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE;
}));
+ invariant(remaining_memsize == 0);
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&h->in_memory_stats, stats_delta);
}
@@ -2277,6 +2273,12 @@ void toku_bnc_flush_to_child(
}
}
+bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) {
+ static const double factor = 0.125;
+ const uint64_t flow_threshold = ft->h->nodesize * factor;
+ return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold;
+}
+
void bring_node_fully_into_memory(FTNODE node, FT h)
{
if (!is_entire_node_in_memory(node)) {
@@ -2299,8 +2301,10 @@ toku_ft_node_put_cmd (
ft_update_func update_fun,
DESCRIPTOR desc,
FTNODE node,
+ int target_childnum,
FT_MSG cmd,
bool is_fresh,
+ size_t flow_deltas[],
STAT64INFO stats_to_update
)
// Effect: Push CMD into the subtree rooted at NODE.
@@ -2317,18 +2321,9 @@ toku_ft_node_put_cmd (
// and instead defer to these functions
//
if (node->height==0) {
- uint64_t workdone = 0;
- toku_ft_leaf_apply_cmd(
- compare_fun,
- update_fun,
- desc,
- node,
- cmd,
- &workdone,
- stats_to_update
- );
+ toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, nullptr, stats_to_update);
} else {
- ft_nonleaf_put_cmd(compare_fun, desc, node, cmd, is_fresh);
+ ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas);
}
}
@@ -2344,6 +2339,7 @@ void toku_ft_leaf_apply_cmd(
ft_update_func update_fun,
DESCRIPTOR desc,
FTNODE node,
+ int target_childnum, // which child to inject to, or -1 if unknown
FT_MSG cmd,
uint64_t *workdone,
STAT64INFO stats_to_update
@@ -2378,16 +2374,19 @@ void toku_ft_leaf_apply_cmd(
}
if (ft_msg_applies_once(cmd)) {
- unsigned int childnum = toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun);
- if (cmd->msn.msn > BLB(node, childnum)->max_msn_applied.msn) {
- BLB(node, childnum)->max_msn_applied = cmd->msn;
+ unsigned int childnum = (target_childnum >= 0
+ ? target_childnum
+ : toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun));
+ BASEMENTNODE bn = BLB(node, childnum);
+ if (cmd->msn.msn > bn->max_msn_applied.msn) {
+ bn->max_msn_applied = cmd->msn;
toku_ft_bn_apply_cmd(compare_fun,
- update_fun,
- desc,
- BLB(node, childnum),
- cmd,
- workdone,
- stats_to_update);
+ update_fun,
+ desc,
+ bn,
+ cmd,
+ workdone,
+ stats_to_update);
} else {
STATUS_INC(FT_MSN_DISCARDS, 1);
}
@@ -2409,38 +2408,44 @@ void toku_ft_leaf_apply_cmd(
}
}
else if (!ft_msg_does_nothing(cmd)) {
- assert(false);
+ abort();
}
VERIFY_NODE(t, node);
}
-static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd)
-// Effect: Put CMD into brt's root node, and update
-// the value of root's max_msn_applied_to_node_on_disk
-{
- FTNODE node = *nodep;
+static void inject_message_in_locked_node(FT ft, FTNODE node, int childnum, FT_MSG_S *cmd, size_t flow_deltas[]) {
+ // No guarantee that we're the writer, but oh well.
+ // TODO(leif): Implement "do I have the lock or is it someone else?"
+ // check in frwlock. Should be possible with TOKU_PTHREAD_DEBUG, nop
+ // otherwise.
+ invariant(toku_ctpair_is_write_locked(node->ct_pair));
toku_assert_entire_node_in_memory(node);
- MSN cmd_msn = cmd->msn;
- invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
+ // Get the MSN from the header. Now that we have a write lock on the
+ // node we're injecting into, we know no other thread will get an MSN
+ // after us and get that message into our subtree before us.
+ cmd->msn.msn = toku_sync_add_and_fetch(&ft->h->max_msn_in_ft.msn, 1);
+ paranoid_invariant(cmd->msn.msn > node->max_msn_applied_to_node_on_disk.msn);
STAT64INFO_S stats_delta = {0,0};
toku_ft_node_put_cmd(
- h->compare_fun,
- h->update_fun,
- &h->cmp_descriptor,
+ ft->compare_fun,
+ ft->update_fun,
+ &ft->cmp_descriptor,
node,
+ childnum,
cmd,
true,
+ flow_deltas,
&stats_delta
);
if (stats_delta.numbytes || stats_delta.numrows) {
- toku_ft_update_stats(&h->in_memory_stats, stats_delta);
+ toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
}
//
// assumption is that toku_ft_node_put_cmd will
// mark the node as dirty.
// enforcing invariant here.
//
- invariant(node->dirty != 0);
+ paranoid_invariant(node->dirty != 0);
// update some status variables
if (node->height != 0) {
@@ -2455,96 +2460,456 @@ static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd)
STATUS_INC(FT_MSG_NUM_BROADCAST, 1);
}
}
+
+ // verify that msn of latest message was captured in root node
+ paranoid_invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
+
+ // if we call flush_some_child, then that function unpins the root
+ // otherwise, we unpin ourselves
+ if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) {
+ flush_node_on_background_thread(ft, node);
+ }
+ else {
+ toku_unpin_ftnode(ft, node);
+ }
}
-void toku_ft_root_put_cmd(FT ft, FT_MSG_S * cmd)
+// seqinsert_loc is a bitmask.
+// The root counts as being both on the "left extreme" and on the "right extreme".
+// Therefore, at the root, you're at LEFT_EXTREME | RIGHT_EXTREME.
+typedef char seqinsert_loc;
+static const seqinsert_loc NEITHER_EXTREME = 0;
+static const seqinsert_loc LEFT_EXTREME = 1;
+static const seqinsert_loc RIGHT_EXTREME = 2;
+
+static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int childnum, seqinsert_loc loc)
// Effect:
-// - assign msn to cmd
-// - push the cmd into the brt
-// - cmd will set new msn in tree
+// If child needs to be split or merged, do that.
+// parent and child will be unlocked if this happens
+// also, the batched pin will have ended if this happens
+// Requires: parent and child are read locked
+// Returns:
+// true if relocking is needed
+// false otherwise
{
- // blackhole fractal trees drop all messages, so do nothing.
- if (ft->blackhole) {
- return;
+ enum reactivity re = get_node_reactivity(child, ft->h->nodesize);
+ enum reactivity newre;
+ BLOCKNUM child_blocknum;
+ uint32_t child_fullhash;
+ switch (re) {
+ case RE_STABLE:
+ return false;
+ case RE_FISSIBLE:
+ {
+ // We only have a read lock on the parent. We need to drop both locks, and get write locks.
+ BLOCKNUM parent_blocknum = parent->thisnodename;
+ uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum);
+ int parent_height = parent->height;
+ int parent_n_children = parent->n_children;
+ toku_unpin_ftnode_read_only(ft, child);
+ toku_unpin_ftnode_read_only(ft, parent);
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_full_read(&bfe, ft);
+ FTNODE newparent, newchild;
+ toku_pin_ftnode_off_client_thread_batched(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, 0, nullptr, &newparent);
+ if (newparent->height != parent_height || newparent->n_children != parent_n_children ||
+ childnum >= newparent->n_children || toku_bnc_n_entries(BNC(newparent, childnum))) {
+ // If the height changed or childnum is now off the end, something clearly got split or merged out from under us.
+ // If something got injected in this node, then it got split or merged and we shouldn't be splitting it.
+ // But we already unpinned the child so we need to have the caller re-try the pins.
+ toku_unpin_ftnode_read_only(ft, newparent);
+ return true;
+ }
+ // It's ok to reuse the same childnum because if we get something
+ // else we need to split, well, that's crazy, but let's go ahead
+ // and split it.
+ child_blocknum = BP_BLOCKNUM(newparent, childnum);
+ child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum);
+ toku_pin_ftnode_off_client_thread_batched(ft, child_blocknum, child_fullhash, &bfe, PL_WRITE_CHEAP, 1, &newparent, &newchild);
+ newre = get_node_reactivity(newchild, ft->h->nodesize);
+ if (newre == RE_FISSIBLE) {
+ enum split_mode split_mode;
+ if (newparent->height == 1 && (loc & LEFT_EXTREME) && childnum == 0) {
+ split_mode = SPLIT_RIGHT_HEAVY;
+ } else if (newparent->height == 1 && (loc & RIGHT_EXTREME) && childnum == newparent->n_children - 1) {
+ split_mode = SPLIT_LEFT_HEAVY;
+ } else {
+ split_mode = SPLIT_EVENLY;
+ }
+ toku_ft_split_child(ft, newparent, childnum, newchild, split_mode);
+ } else {
+ // some other thread already got it, just unpin and tell the
+ // caller to retry
+ toku_unpin_ftnode_read_only(ft, newchild);
+ toku_unpin_ftnode_read_only(ft, newparent);
+ }
+ return true;
+ }
+ case RE_FUSIBLE:
+ {
+ if (parent->height == 1) {
+ // prevent re-merging of recently unevenly-split nodes
+ if (((loc & LEFT_EXTREME) && childnum <= 1) ||
+ ((loc & RIGHT_EXTREME) && childnum >= parent->n_children - 2)) {
+ return false;
+ }
+ }
+
+ int parent_height = parent->height;
+ BLOCKNUM parent_blocknum = parent->thisnodename;
+ uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum);
+ toku_unpin_ftnode_read_only(ft, child);
+ toku_unpin_ftnode_read_only(ft, parent);
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_full_read(&bfe, ft);
+ FTNODE newparent, newchild;
+ toku_pin_ftnode_off_client_thread_batched(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, 0, nullptr, &newparent);
+ if (newparent->height != parent_height || childnum >= newparent->n_children) {
+ // looks like this is the root and it got merged, let's just start over (like in the split case above)
+ toku_unpin_ftnode_read_only(ft, newparent);
+ return true;
+ }
+ child_blocknum = BP_BLOCKNUM(newparent, childnum);
+ child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum);
+ toku_pin_ftnode_off_client_thread_batched(ft, child_blocknum, child_fullhash, &bfe, PL_READ, 1, &newparent, &newchild);
+ newre = get_node_reactivity(newchild, ft->h->nodesize);
+ if (newre == RE_FUSIBLE) {
+ if (newparent->n_children < 2) {
+ // weird case where newparent is the root node and
+ // can't merge, so it might only have one child. In
+ // this case, just return false so we can insert
+ // anyway.
+ return false;
+ }
+ toku_unpin_ftnode_read_only(ft, newchild);
+ toku_ft_merge_child(ft, newparent, childnum);
+ } else {
+ // some other thread already got it, just unpin and tell the
+ // caller to retry
+ toku_unpin_ftnode_read_only(ft, newchild);
+ toku_unpin_ftnode_read_only(ft, newparent);
+ }
+ return true;
+ }
}
+ abort();
+}
+static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, FT_MSG_S *cmd, size_t flow_deltas[])
+// Effect:
+// Inject cmd into the node at this blocknum (cachekey).
+// Gets a write lock on the node for you.
+{
FTNODE node;
- CACHEKEY root_key;
- //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
- assert(ft);
- //
- // As of Dr. Noga, the following code is currently protected by two locks:
- // - the ydb lock
- // - header's tree lock
- //
- // We hold the header's tree lock to stop other threads from
- // descending down the tree while the root node may change.
- // The root node may change when ft_process_maybe_reactive_root is called.
- // Other threads (such as the cleaner thread or hot optimize) that want to
- // descend down the tree must grab the header's tree lock, so they are
- // ensured that what they think is the root's blocknum is actually
- // the root's blocknum.
- //
- // We also hold the ydb lock for a number of reasons, but an important
- // one is to make sure that a begin_checkpoint may not start while
- // this code is executing. A begin_checkpoint does (at least) two things
- // that can interfere with the operations here:
- // - copies the header to a checkpoint header. Because we may change
- // the root blocknum below, we don't want the header to be copied in
- // the middle of these operations.
- // - Takes note of the log's LSN. Because this put operation has
- // already been logged, this message injection must be included
- // in any checkpoint that contains this put's logentry.
- // Holding the ydb lock throughout this function ensures that fact.
- // As of Dr. Noga, I (Zardosht) THINK these are the only reasons why
- // the ydb lock must be held for this function, but there may be
- // others
- //
- {
- uint32_t fullhash;
- toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_full_read(&bfe, ft);
+ toku_pin_ftnode_off_client_thread_batched(ft, cachekey, fullhash, &bfe, PL_WRITE_CHEAP, 0, NULL, &node);
+ toku_assert_entire_node_in_memory(node);
+ paranoid_invariant(node->fullhash==fullhash);
+ ft_verify_flags(ft, node);
+ inject_message_in_locked_node(ft, node, -1, cmd, flow_deltas);
+}
- // get the root node
- struct ftnode_fetch_extra bfe;
- fill_bfe_for_full_read(&bfe, ft);
- toku_pin_ftnode_off_client_thread(
- ft,
- root_key,
- fullhash,
- &bfe,
- PL_WRITE_EXPENSIVE, // may_modify_node
- 0,
- NULL,
- &node
- );
- toku_assert_entire_node_in_memory(node);
+__attribute__((const))
+static inline bool should_inject_in_node(seqinsert_loc loc, int height, int depth)
+// We should inject directly in a node if:
+// - it's a leaf, or
+// - it's a height 1 node not at either extreme, or
+// - it's a depth 2 node not at either extreme
+{
+ return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2)));
+}
- //VERIFY_NODE(brt, node);
- assert(node->fullhash==fullhash);
- ft_verify_flags(ft, node);
+static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_childnum, FT_MSG_S *cmd, size_t flow_deltas[], int depth, seqinsert_loc loc, bool just_did_split_or_merge)
+// Effects:
+// Assign cmd an MSN from ft->h.
+// Put cmd in the subtree rooted at node. Due to promotion the message may not be injected directly in this node.
+// Unlock node or schedule it to be unlocked (after a background flush).
+// Either way, the caller is not responsible for unlocking node.
+// Requires:
+// subtree_root is read locked and fully in memory.
+// Notes:
+// In Ming, the basic rules of promotion are as follows:
+// Don't promote broadcast messages.
+// Don't promote past non-empty buffers.
+// Otherwise, promote at most to height 1 or depth 2 (whichever is highest), as far as the birdie asks you to promote.
+// We don't promote to leaves because injecting into leaves is expensive, mostly because of #5605 and some of #5552.
+// We don't promote past depth 2 because we found that gives us enough parallelism without costing us too much pinning work.
+//
+// This is true with the following caveats:
+// We always promote all the way to the leaves on the rightmost and leftmost edges of the tree, for sequential insertions.
+// (That means we can promote past depth 2 near the edges of the tree.)
+//
+// When the birdie is still saying we should promote, we use get_and_pin so that we wait to get the node.
+// If the birdie doesn't say to promote, we try maybe_get_and_pin. If we get the node cheaply, and it's dirty, we promote anyway.
+{
+ toku_assert_entire_node_in_memory(subtree_root);
+ if (should_inject_in_node(loc, subtree_root->height, depth)) {
+ switch (depth) {
+ case 0:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break;
+ case 1:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break;
+ case 2:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break;
+ case 3:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break;
+ default:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
+ }
+ inject_message_in_locked_node(ft, subtree_root, target_childnum, cmd, flow_deltas);
+ } else {
+ int r;
+ int childnum;
+ NONLEAF_CHILDINFO bnc;
+
+ // toku_ft_root_put_cmd should not have called us otherwise.
+ paranoid_invariant(ft_msg_applies_once(cmd));
+
+ childnum = (target_childnum >= 0 ? target_childnum
+ : toku_ftnode_which_child(subtree_root, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun));
+ bnc = BNC(subtree_root, childnum);
+
+ if (toku_bnc_n_entries(bnc) > 0) {
+ // The buffer is non-empty, give up on promoting.
+ STATUS_INC(FT_PRO_NUM_STOP_NONEMPTY_BUF, 1);
+ goto relock_and_push_here;
+ }
+
+ seqinsert_loc next_loc;
+ if ((loc & LEFT_EXTREME) && childnum == 0) {
+ next_loc = LEFT_EXTREME;
+ } else if ((loc & RIGHT_EXTREME) && childnum == subtree_root->n_children - 1) {
+ next_loc = RIGHT_EXTREME;
+ } else {
+ next_loc = NEITHER_EXTREME;
+ }
+
+ if (next_loc == NEITHER_EXTREME && subtree_root->height <= 1) {
+ // Never promote to leaf nodes except on the edges
+ STATUS_INC(FT_PRO_NUM_STOP_H1, 1);
+ goto relock_and_push_here;
+ }
- // first handle a reactive root, then put in the message
- // ft_process_maybe_reactive_root may release and re-pin the root
- // so we cannot set the msn yet.
- ft_process_maybe_reactive_root(ft, &node);
+ {
+ const BLOCKNUM child_blocknum = BP_BLOCKNUM(subtree_root, childnum);
+ toku_verify_blocknum_allocated(ft->blocktable, child_blocknum);
+ const uint32_t child_fullhash = toku_cachetable_hash(ft->cf, child_blocknum);
- cmd->msn.msn = node->max_msn_applied_to_node_on_disk.msn + 1;
- // Note, the lower level function that filters messages based on
- // msn, (toku_ft_bn_apply_cmd() or ft_nonleaf_put_cmd()) will capture
- // the msn and store it in the relevant node, including the root
- // node. This is how the new msn is set in the root.
+ FTNODE child;
+ {
+ const int child_height = subtree_root->height - 1;
+ const int child_depth = depth + 1;
+ // If we're locking a leaf, or a height 1 node or depth 2
+ // node in the middle, we know we won't promote further
+ // than that, so just get a write lock now.
+ const pair_lock_type lock_type = (should_inject_in_node(next_loc, child_height, child_depth)
+ ? PL_WRITE_CHEAP
+ : PL_READ);
+ if (next_loc != NEITHER_EXTREME || (toku_bnc_should_promote(ft, bnc) && depth <= 1)) {
+ // If we're on either extreme, or the birdie wants to
+ // promote and we're in the top two levels of the
+ // tree, don't stop just because someone else has the
+ // node locked.
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_full_read(&bfe, ft);
+ toku_pin_ftnode_off_client_thread_batched(ft, child_blocknum, child_fullhash, &bfe, lock_type, 0, nullptr, &child);
+ } else {
+ r = toku_maybe_pin_ftnode_clean(ft, child_blocknum, child_fullhash, lock_type, &child);
+ if (r != 0) {
+ // We couldn't get the child cheaply, so give up on promoting.
+ STATUS_INC(FT_PRO_NUM_STOP_LOCK_CHILD, 1);
+ goto relock_and_push_here;
+ }
+ if (is_entire_node_in_memory(child)) {
+ // toku_pin_ftnode... touches the clock but toku_maybe_pin_ftnode... doesn't.
+ // This prevents partial eviction.
+ for (int i = 0; i < child->n_children; ++i) {
+ BP_TOUCH_CLOCK(child, i);
+ }
+ } else {
+ // We got the child, but it's not fully in memory. Give up on promoting.
+ STATUS_INC(FT_PRO_NUM_STOP_CHILD_INMEM, 1);
+ goto unlock_child_and_push_here;
+ }
+ }
+ }
+ paranoid_invariant_notnull(child);
+
+ if (!just_did_split_or_merge) {
+ BLOCKNUM subtree_root_blocknum = subtree_root->thisnodename;
+ uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum);
+ const bool did_split_or_merge = process_maybe_reactive_child(ft, subtree_root, child, childnum, loc);
+ if (did_split_or_merge) {
+ // Need to re-pin this node and try at this level again.
+ FTNODE newparent;
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_full_read(&bfe, ft); // should be fully in memory, we just split it
+ toku_pin_ftnode_off_client_thread_batched(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, 0, nullptr, &newparent);
+ push_something_in_subtree(ft, newparent, -1, cmd, flow_deltas, depth, loc, true);
+ return;
+ }
+ }
+
+ if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) {
+ push_something_in_subtree(ft, child, -1, cmd, flow_deltas, depth + 1, next_loc, false);
+ toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]);
+ // The recursive call unpinned the child, but
+ // we're responsible for unpinning subtree_root.
+ toku_unpin_ftnode_read_only(ft, subtree_root);
+ return;
+ }
+
+ STATUS_INC(FT_PRO_NUM_DIDNT_WANT_PROMOTE, 1);
+ unlock_child_and_push_here:
+ // We locked the child, but we decided not to promote.
+ // Unlock the child, and fall through to the next case.
+ toku_unpin_ftnode_read_only(ft, child);
+ }
+ relock_and_push_here:
+ // Give up on promoting.
+ // We have subtree_root read-locked and we don't have a child locked.
+ // Drop the read lock, grab a write lock, and inject here.
+ {
+ // Right now we have a read lock on subtree_root, but we want
+ // to inject into it so we get a write lock instead.
+ BLOCKNUM subtree_root_blocknum = subtree_root->thisnodename;
+ uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum);
+ toku_unpin_ftnode_read_only(ft, subtree_root);
+ switch (depth) {
+ case 0:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break;
+ case 1:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break;
+ case 2:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break;
+ case 3:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break;
+ default:
+ STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
+ }
+ inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, cmd, flow_deltas);
+ }
}
- push_something_at_root(ft, &node, cmd);
- // verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
- invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
+}
- // if we call flush_some_child, then that function unpins the root
- // otherwise, we unpin ourselves
- if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) {
- flush_node_on_background_thread(ft, node);
+void toku_ft_root_put_cmd(FT ft, FT_MSG_S *cmd)
+// Effect:
+// - assign msn to cmd and update msn in the header
+// - push the cmd into the ft
+
+// As of Clayface, the root blocknum is a constant, so preventing a
+// race between message injection and the split of a root is the job
+// of the cachetable's locking rules.
+//
+// We also hold the MO lock for a number of reasons, but an important
+// one is to make sure that a begin_checkpoint may not start while
+// this code is executing. A begin_checkpoint does (at least) two things
+// that can interfere with the operations here:
+// - Copies the header to a checkpoint header. Because we may change
+// the max_msn_in_ft below, we don't want the header to be copied in
+// the middle of these operations.
+// - Takes note of the log's LSN. Because this put operation has
+// already been logged, this message injection must be included
+// in any checkpoint that contains this put's logentry.
+// Holding the mo lock throughout this function ensures that fact.
+{
+ // blackhole fractal trees drop all messages, so do nothing.
+ if (ft->blackhole) {
+ return;
}
- else {
- toku_unpin_ftnode(ft, node); // unpin root
+
+ FTNODE node;
+
+ uint32_t fullhash;
+ CACHEKEY root_key;
+ toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_full_read(&bfe, ft);
+
+ size_t flow_deltas[] = { toku_ft_msg_memsize_in_fifo(cmd), 0 };
+
+ pair_lock_type lock_type;
+ lock_type = PL_READ; // try first for a read lock
+ // If we need to split the root, we'll have to change from a read lock
+ // to a write lock and check again. We change the variable lock_type
+ // and jump back to here.
+ change_lock_type:
+ // get the root node
+ toku_pin_ftnode_off_client_thread_batched(ft, root_key, fullhash, &bfe, lock_type, 0, NULL, &node);
+ toku_assert_entire_node_in_memory(node);
+ paranoid_invariant(node->fullhash==fullhash);
+ ft_verify_flags(ft, node);
+
+ // First handle a reactive root.
+ // This relocking for split algorithm will cause every message
+ // injection thread to change lock type back and forth, when only one
+ // of them needs to in order to handle the split. That's not great,
+ // but root splits are incredibly rare.
+ enum reactivity re = get_node_reactivity(node, ft->h->nodesize);
+ switch (re) {
+ case RE_STABLE:
+ case RE_FUSIBLE: // cannot merge anything at the root
+ if (lock_type != PL_READ) {
+ // We thought we needed to split, but someone else got to
+ // it before us. Downgrade to a read lock.
+ toku_unpin_ftnode_read_only(ft, node);
+ lock_type = PL_READ;
+ goto change_lock_type;
+ }
+ break;
+ case RE_FISSIBLE:
+ if (lock_type == PL_READ) {
+ // Here, we only have a read lock on the root. In order
+ // to split it, we need a write lock, but in the course of
+ // gaining the write lock, someone else may have gotten in
+ // before us and split it. So we upgrade to a write lock
+ // and check again.
+ toku_unpin_ftnode_read_only(ft, node);
+ lock_type = PL_WRITE_CHEAP;
+ goto change_lock_type;
+ } else {
+ // We have a write lock, now we can split.
+ ft_init_new_root(ft, node, &node);
+ // Then downgrade back to a read lock, and we can finally
+ // do the injection.
+ toku_unpin_ftnode_off_client_thread(ft, node);
+ lock_type = PL_READ;
+ STATUS_INC(FT_PRO_NUM_ROOT_SPLIT, 1);
+ goto change_lock_type;
+ }
+ break;
+ }
+ // If we get to here, we have a read lock and the root doesn't
+ // need to be split. It's safe to inject the message.
+ paranoid_invariant(lock_type == PL_READ);
+ // We cannot assert that we have the read lock because frwlock asserts
+ // that its mutex is locked when we check if there are any readers.
+ // That wouldn't give us a strong guarantee that we have the read lock
+ // anyway.
+
+ // Now, either inject here or promote. We decide based on a heuristic:
+ if (node->height == 0 || !ft_msg_applies_once(cmd)) {
+ // If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here.
+ toku_unpin_ftnode_read_only(ft, node);
+ STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1);
+ inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas);
+ } else if (node->height > 1) {
+ // If the root's above height 1, we are definitely eligible for promotion.
+ push_something_in_subtree(ft, node, -1, cmd, flow_deltas, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
+ } else {
+ // The root's height 1. We may be eligible for promotion here.
+ // On the extremes, we want to promote, in the middle, we don't.
+ int childnum = toku_ftnode_which_child(node, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun);
+ if (childnum == 0 || childnum == node->n_children - 1) {
+ // On the extremes, promote. We know which childnum we're going to, so pass that down too.
+ push_something_in_subtree(ft, node, childnum, cmd, flow_deltas, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
+ } else {
+ // At height 1 in the middle, don't promote, drop the read lock and inject here.
+ toku_unpin_ftnode_read_only(ft, node);
+ STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1);
+ inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas);
+ }
}
}
@@ -2554,7 +2919,7 @@ void toku_ft_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn) {
}
void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) {
- assert(txn);
+ paranoid_invariant(txn);
toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log
//before the (old) file is actually unlinked
TOKULOGGER logger = toku_txn_logger(txn);
@@ -2573,7 +2938,7 @@ void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_in
// - write to recovery log
void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn)
{
- assert(txn);
+ paranoid_invariant(txn);
TOKULOGGER logger = toku_txn_logger(txn);
// write to the rollback log
@@ -2661,7 +3026,7 @@ toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu
}
void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) {
- assert(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE);
+ paranoid_invariant(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE);
XIDS message_xids = xids_get_root_xids(); //By default use committed messages
TXNID xid = toku_txn_get_txnid(txn);
if (txn) {
@@ -2866,7 +3231,7 @@ void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids) {
struct omt_compressor_state {
struct mempool *new_kvspace;
- OMT omt;
+ OMTVALUE *newvals;
};
static int move_it (OMTVALUE lev, uint32_t idx, void *v) {
@@ -2874,23 +3239,27 @@ static int move_it (OMTVALUE lev, uint32_t idx, void *v) {
struct omt_compressor_state *CAST_FROM_VOIDP(oc, v);
uint32_t size = leafentry_memsize(le);
LEAFENTRY CAST_FROM_VOIDP(newdata, toku_mempool_malloc(oc->new_kvspace, size, 1));
- lazy_assert(newdata); // we do this on a fresh mempool, so nothing bad should happen
+ paranoid_invariant_notnull(newdata); // we do this on a fresh mempool, so nothing bad should happen
memcpy(newdata, le, size);
- toku_omt_set_at(oc->omt, newdata, idx);
+ oc->newvals[idx] = newdata;
return 0;
}
// Compress things, and grow the mempool if needed.
-static void omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size, void **maybe_free) {
+static void omt_compress_kvspace (OMT *omtp, struct mempool *memp, size_t added_size, void **maybe_free) {
uint32_t total_size_needed = memp->free_offset-memp->frag_size + added_size;
- if (total_size_needed+total_size_needed/4 >= memp->size) {
- memp->size = total_size_needed+total_size_needed/4;
+ if (total_size_needed+total_size_needed >= memp->size) {
+ memp->size = total_size_needed+total_size_needed;
}
void *newmem = toku_xmalloc(memp->size);
struct mempool new_kvspace;
toku_mempool_init(&new_kvspace, newmem, memp->size);
- struct omt_compressor_state oc = { &new_kvspace, omt };
- toku_omt_iterate(omt, move_it, &oc);
+ uint32_t numvals = toku_omt_size(*omtp);
+ OMTVALUE *XMALLOC_N(numvals, newvals);
+ struct omt_compressor_state oc = { &new_kvspace, newvals };
+ toku_omt_iterate(*omtp, move_it, &oc);
+ toku_omt_destroy(omtp);
+ toku_omt_create_steal_sorted_array(omtp, &newvals, numvals, numvals);
if (maybe_free) {
*maybe_free = memp->base;
@@ -2901,12 +3270,12 @@ static void omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_si
}
void *
-mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free) {
+mempool_malloc_from_omt(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free) {
void *v = toku_mempool_malloc(mp, size, 1);
if (v == NULL) {
- omt_compress_kvspace(omt, mp, size, maybe_free);
+ omt_compress_kvspace(omtp, mp, size, maybe_free);
v = toku_mempool_malloc(mp, size, 1);
- lazy_assert(v);
+ paranoid_invariant_notnull(v);
}
return v;
}
@@ -3709,7 +4078,7 @@ int fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo)
* basement node.
*/
static void
-do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, struct fifo_entry *entry, STAT64INFO stats_to_update)
+do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, uint64_t *workdone, STAT64INFO stats_to_update)
{
// The messages are being iterated over in (key,msn) order or just in
// msn order, so all the messages for one key, from one buffer, are in
@@ -3734,7 +4103,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, str
&t->ft->cmp_descriptor,
bn,
&ftcmd,
- &BP_WORKDONE(ancestor, childnum),
+ workdone,
stats_to_update
);
} else {
@@ -3750,17 +4119,16 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, str
struct iterate_do_bn_apply_cmd_extra {
FT_HANDLE t;
BASEMENTNODE bn;
- FTNODE ancestor;
- int childnum;
+ NONLEAF_CHILDINFO bnc;
+ uint64_t *workdone;
STAT64INFO stats_to_update;
};
int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e) __attribute__((nonnull(3)));
int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e)
{
- NONLEAF_CHILDINFO bnc = BNC(e->ancestor, e->childnum);
- struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offset);
- do_bn_apply_cmd(e->t, e->bn, e->ancestor, e->childnum, entry, e->stats_to_update);
+ struct fifo_entry *entry = toku_fifo_get_entry(e->bnc->buffer, offset);
+ do_bn_apply_cmd(e->t, e->bn, entry, e->workdone, e->stats_to_update);
return 0;
}
@@ -3891,6 +4259,8 @@ bnc_apply_messages_to_basement_node(
// Determine the offsets in the message trees between which we need to
// apply messages from this buffer
STAT64INFO_S stats_delta = {0,0};
+ uint64_t workdone_this_ancestor = 0;
+
uint32_t stale_lbi, stale_ube;
if (!bn->stale_ancestor_messages_applied) {
find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube);
@@ -3938,13 +4308,13 @@ bnc_apply_messages_to_basement_node(
for (int i = 0; i < buffer_size; ++i) {
*msgs_applied = true;
struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
- do_bn_apply_cmd(t, bn, ancestor, childnum, entry, &stats_delta);
+ do_bn_apply_cmd(t, bn, entry, &workdone_this_ancestor, &stats_delta);
}
toku_free(offsets);
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
- struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum, .stats_to_update = &stats_delta};
+ struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
if (fresh_ube - fresh_lbi > 0) *msgs_applied = true;
r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(fresh_lbi, fresh_ube, &iter_extra);
assert_zero(r);
@@ -3953,7 +4323,7 @@ bnc_apply_messages_to_basement_node(
// No fresh messages to apply, we just apply stale messages.
if (stale_ube - stale_lbi > 0) *msgs_applied = true;
- struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum , .stats_to_update = &stats_delta};
+ struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(stale_lbi, stale_ube, &iter_extra);
assert_zero(r);
@@ -3961,11 +4331,12 @@ bnc_apply_messages_to_basement_node(
//
// update stats
//
+ if (workdone_this_ancestor > 0) {
+ (void) toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum), workdone_this_ancestor);
+ }
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
}
-#if 0
-#endif
}
void
@@ -3993,7 +4364,7 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
- assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
+ paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
bnc_apply_messages_to_basement_node(
t,
curr_bn,
@@ -4045,7 +4416,7 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
- assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
+ paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
if (bnc->broadcast_list.size() > 0) {
needs_ancestors_messages = true;
@@ -4099,7 +4470,7 @@ void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied) {
// Any threads trying to update these basement nodes should be
// updating them to the same thing (since they all have a read lock on
// the same root-to-leaf path) so this is safe.
- (void) __sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
+ (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
}
}
}
@@ -4202,7 +4573,7 @@ ok: ;
idx--;
break;
default:
- assert(false);
+ abort();
}
r = toku_omt_fetch(bn->buffer, idx, &datav);
assert_zero(r); // we just validated the index
@@ -4353,7 +4724,7 @@ unlock_ftnode_fun (PAIR p, void *v) {
(enum cachetable_dirty) node->dirty,
x->msgs_applied ? make_ftnode_pair_attr(node) : make_invalid_pair_attr()
);
- assert(r==0);
+ assert_zero(r);
}
/* search in a node's child */
@@ -4414,7 +4785,7 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
toku_unpin_ftnode(brt->ft, childnode);
}
else {
- toku_unpin_ftnode_read_only(brt, childnode);
+ toku_unpin_ftnode_read_only(brt->ft, childnode);
}
} else {
// try again.
@@ -4430,7 +4801,7 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
toku_unpin_ftnode(brt->ft, childnode);
}
else {
- toku_unpin_ftnode_read_only(brt, childnode);
+ toku_unpin_ftnode_read_only(brt->ft, childnode);
}
}
}
@@ -4565,7 +4936,8 @@ ft_search_node(
{
int r = 0;
// assert that we got a valid child_to_search
- assert(child_to_search >= 0 && child_to_search < node->n_children);
+ invariant(child_to_search >= 0);
+ invariant(child_to_search < node->n_children);
//
// At this point, we must have the necessary partition available to continue the search
//
@@ -4666,7 +5038,6 @@ toku_ft_search (FT_HANDLE brt, ft_search_t *search, FT_GET_CALLBACK_FUNCTION get
try_again:
trycount++;
- assert(ft);
//
// Here is how searches work
@@ -4740,7 +5111,7 @@ try_again:
// some piece of a node that it needed was not in memory.
// In this case, the node was not unpinned, so we unpin it here
if (unlockers.locked) {
- toku_unpin_ftnode_read_only(brt, node);
+ toku_unpin_ftnode_read_only(brt->ft, node);
}
goto try_again;
} else {
@@ -4749,7 +5120,7 @@ try_again:
}
assert(unlockers.locked);
- toku_unpin_ftnode_read_only(brt, node);
+ toku_unpin_ftnode_read_only(brt->ft, node);
//Heaviside function (+direction) queries define only a lower or upper
@@ -5163,7 +5534,7 @@ keyrange_in_leaf_partition (FT_HANDLE brt, FTNODE node, DBT *key, int child_numb
// If the partition is in main memory then estimate the number
// If KEY==NULL then use an arbitrary key (leftmost or zero)
{
- assert(node->height == 0); // we are in a leaf
+ paranoid_invariant(node->height == 0); // we are in a leaf
if (BP_STATE(node, child_number) == PT_AVAIL) {
// If the partition is in main memory then get an exact count.
struct keyrange_compare_s s = {brt,key};
@@ -5228,9 +5599,9 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
&childnode,
&msgs_applied
);
- assert(!msgs_applied);
+ paranoid_invariant(!msgs_applied);
if (r != TOKUDB_TRY_AGAIN) {
- assert(r == 0);
+ assert_zero(r);
struct unlock_ftnode_extra unlock_extra = {brt,childnode,false};
struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, unlockers};
@@ -5239,13 +5610,13 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node,
r = toku_ft_keyrange_internal(brt, childnode, key, less, equal, greater, rows_per_child,
bfe, &next_unlockers, &next_ancestors, &next_bounds);
if (r != TOKUDB_TRY_AGAIN) {
- assert(r == 0);
+ assert_zero(r);
*less += rows_per_child * child_number;
*greater += rows_per_child * (node->n_children - child_number - 1);
assert(unlockers->locked);
- toku_unpin_ftnode_read_only(brt, childnode);
+ toku_unpin_ftnode_read_only(brt->ft, childnode);
}
}
}
@@ -5261,7 +5632,6 @@ void toku_ft_keyrange(FT_HANDLE brt, DBT *key, uint64_t *less_p, uint64_t *equal
// TODO 4184: What to do with a NULL key?
// If KEY is NULL then the system picks an arbitrary key and returns it.
{
- assert(brt->ft);
struct ftnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, brt->ft); // read pivot keys but not message buffers
try_again:
@@ -5302,7 +5672,7 @@ try_again:
}
}
assert(unlockers.locked);
- toku_unpin_ftnode_read_only(brt, node);
+ toku_unpin_ftnode_read_only(brt->ft, node);
*less_p = less;
*equal_p = equal;
*greater_p = greater;
@@ -5310,7 +5680,6 @@ try_again:
}
void toku_ft_handle_stat64 (FT_HANDLE brt, TOKUTXN UU(txn), struct ftstat64_s *s) {
- assert(brt->ft);
toku_ft_stat64(brt->ft, s);
}
@@ -5320,7 +5689,7 @@ toku_dump_ftnode (FILE *file, FT_HANDLE brt, BLOCKNUM blocknum, int depth, const
int result=0;
FTNODE node;
toku_get_node_for_verify(blocknum, brt, &node);
- result=toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
+ result=toku_verify_ftnode(brt, brt->ft->h->max_msn_in_ft, brt->ft->h->max_msn_in_ft, false, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
uint32_t fullhash = toku_cachetable_hash(brt->ft->cf, blocknum);
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, brt->ft);
diff --git a/ft/ft-serialize.cc b/ft/ft-serialize.cc
index 409e46827c6..cea51ef3ecc 100644
--- a/ft/ft-serialize.cc
+++ b/ft/ft-serialize.cc
@@ -134,8 +134,8 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
{
int r;
FT ft = NULL;
- invariant(version >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
- invariant(version <= FT_LAYOUT_VERSION);
+ paranoid_invariant(version >= FT_LAYOUT_MIN_SUPPORTED_VERSION);
+ paranoid_invariant(version <= FT_LAYOUT_VERSION);
// We already know:
// we have an rbuf representing the header.
// The checksum has been validated
@@ -290,6 +290,12 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
}
}
+ MSN max_msn_in_ft;
+ max_msn_in_ft = ZERO_MSN; // We'll upgrade it from the root node later if necessary
+ if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_21) {
+ max_msn_in_ft = rbuf_msn(rb);
+ }
+
(void) rbuf_int(rb); //Read in checksum and ignore (already verified).
if (rb->ndone != rb->size) {
fprintf(stderr, "Header size did not match contents.\n");
@@ -317,6 +323,7 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
.basementnodesize = basementnodesize,
.compression_method = compression_method,
.highest_unused_msn_for_upgrade = highest_unused_msn_for_upgrade,
+ .max_msn_in_ft = max_msn_in_ft,
.time_of_last_optimize_begin = time_of_last_optimize_begin,
.time_of_last_optimize_end = time_of_last_optimize_end,
.count_of_optimize_in_progress = count_of_optimize_in_progress,
@@ -335,6 +342,12 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version)
goto exit;
}
}
+ if (ft->layout_version_read_from_disk < FT_LAYOUT_VERSION_21) {
+ r = toku_upgrade_msn_from_root_to_header(fd, ft);
+ if (r != 0) {
+ goto exit;
+ }
+ }
invariant((uint32_t) ft->layout_version_read_from_disk == version);
r = deserialize_descriptor_from(fd, ft->blocktable, &ft->descriptor, version);
@@ -366,10 +379,12 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0;
switch(version) {
+ case FT_LAYOUT_VERSION_21:
+ size += sizeof(MSN); // max_msn_in_ft
case FT_LAYOUT_VERSION_20:
case FT_LAYOUT_VERSION_19:
size += 1; // compression method
- size += sizeof(uint64_t); // highest_unused_msn_for_upgrade
+ size += sizeof(MSN); // highest_unused_msn_for_upgrade
case FT_LAYOUT_VERSION_18:
size += sizeof(uint64_t); // time_of_last_optimize_begin
size += sizeof(uint64_t); // time_of_last_optimize_end
@@ -412,9 +427,9 @@ serialize_ft_min_size (uint32_t version) {
);
break;
default:
- lazy_assert(false);
+ abort();
}
-
+
lazy_assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE);
return size;
}
@@ -637,7 +652,7 @@ toku_deserialize_ft_from(int fd,
version = version_1;
}
- invariant(rb);
+ paranoid_invariant(rb);
r = deserialize_ft_versioned(fd, rb, ft, version);
exit:
@@ -694,6 +709,7 @@ void toku_serialize_ft_to_wbuf (
wbuf_MSN(wbuf, h->msn_at_start_of_last_completed_optimize);
wbuf_char(wbuf, (unsigned char) h->compression_method);
wbuf_MSN(wbuf, h->highest_unused_msn_for_upgrade);
+ wbuf_MSN(wbuf, h->max_msn_in_ft);
uint32_t checksum = x1764_finish(&wbuf->checksum);
wbuf_int(wbuf, checksum);
lazy_assert(wbuf->ndone == wbuf->size);
diff --git a/ft/ft-test-helpers.cc b/ft/ft-test-helpers.cc
index a121fc2d658..d2647ab9ccf 100644
--- a/ft/ft-test-helpers.cc
+++ b/ft/ft-test-helpers.cc
@@ -136,15 +136,18 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
.u = { .id = { toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen) } } };
+ static size_t zero_flow_deltas[] = { 0, 0 };
toku_ft_node_put_cmd (
brt->ft->compare_fun,
brt->ft->update_fun,
&brt->ft->cmp_descriptor,
node,
+ -1,
&cmd,
true,
+ zero_flow_deltas,
NULL
- );
+ );
toku_verify_or_set_counts(node);
@@ -215,6 +218,8 @@ int toku_testsetup_insert_to_nonleaf (FT_HANDLE brt, BLOCKNUM blocknum, enum ft_
// using brt APIs.
node->max_msn_applied_to_node_on_disk = msn;
node->dirty = 1;
+ // Also hack max_msn_in_ft
+ brt->ft->h->max_msn_in_ft = msn;
toku_unpin_ftnode(brt->ft, node);
return 0;
diff --git a/ft/ft-verify.cc b/ft/ft-verify.cc
index c763cfa1ef9..834ed4b1d4c 100644
--- a/ft/ft-verify.cc
+++ b/ft/ft-verify.cc
@@ -245,7 +245,7 @@ toku_get_node_for_verify(
static int
toku_verify_ftnode_internal(FT_HANDLE brt,
- MSN rootmsn, MSN parentmsn,
+ MSN rootmsn, MSN parentmsn, bool messages_exist_above,
FTNODE node, int height,
const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
@@ -258,16 +258,11 @@ toku_verify_ftnode_internal(FT_HANDLE brt,
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
toku_assert_entire_node_in_memory(node);
this_msn = node->max_msn_applied_to_node_on_disk;
- if (rootmsn.msn == ZERO_MSN.msn) {
- assert(parentmsn.msn == ZERO_MSN.msn);
- rootmsn = this_msn;
- parentmsn = this_msn;
- }
if (height >= 0) {
invariant(height == node->height); // this is a bad failure if wrong
}
- if (node->height > 0) {
+ if (node->height > 0 && messages_exist_above) {
VERIFY_ASSERTION((parentmsn.msn >= this_msn.msn), 0, "node msn must be descending down tree, newest messages at top");
}
// Verify that all the pivot keys are in order.
@@ -390,7 +385,7 @@ done:
// input is a pinned node, on exit, node is unpinned
int
toku_verify_ftnode (FT_HANDLE brt,
- MSN rootmsn, MSN parentmsn,
+ MSN rootmsn, MSN parentmsn, bool messages_exist_above,
FTNODE node, int height,
const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
@@ -402,11 +397,6 @@ toku_verify_ftnode (FT_HANDLE brt,
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
toku_assert_entire_node_in_memory(node);
this_msn = node->max_msn_applied_to_node_on_disk;
- if (rootmsn.msn == ZERO_MSN.msn) {
- assert(parentmsn.msn == ZERO_MSN.msn);
- rootmsn = this_msn;
- parentmsn = this_msn;
- }
int result = 0;
int result2 = 0;
@@ -414,7 +404,7 @@ toku_verify_ftnode (FT_HANDLE brt,
// Otherwise we'll just do the next call
result = toku_verify_ftnode_internal(
- brt, rootmsn, parentmsn, node, height, lesser_pivot, greatereq_pivot,
+ brt, rootmsn, parentmsn, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
verbose, keep_going_on_failure, false);
if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
}
@@ -422,7 +412,7 @@ toku_verify_ftnode (FT_HANDLE brt,
toku_move_ftnode_messages_to_stale(brt->ft, node);
}
result2 = toku_verify_ftnode_internal(
- brt, rootmsn, parentmsn, node, height, lesser_pivot, greatereq_pivot,
+ brt, rootmsn, parentmsn, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
verbose, keep_going_on_failure, true);
if (result == 0) {
result = result2;
@@ -434,7 +424,7 @@ toku_verify_ftnode (FT_HANDLE brt,
for (int i = 0; i < node->n_children; i++) {
FTNODE child_node;
toku_get_node_for_verify(BP_BLOCKNUM(node, i), brt, &child_node);
- int r = toku_verify_ftnode(brt, rootmsn, this_msn,
+ int r = toku_verify_ftnode(brt, rootmsn, this_msn, messages_exist_above || toku_bnc_n_entries(BNC(node, i)) > 0,
child_node, node->height-1,
(i==0) ? lesser_pivot : &node->childkeys[i-1],
(i==node->n_children-1) ? greatereq_pivot : &node->childkeys[i],
@@ -465,7 +455,7 @@ toku_verify_ft_with_progress (FT_HANDLE brt, int (*progress_callback)(void *extr
toku_calculate_root_offset_pointer(brt->ft, &root_key, &root_hash);
toku_get_node_for_verify(root_key, brt, &root_node);
}
- int r = toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
+ int r = toku_verify_ftnode(brt, brt->ft->h->max_msn_in_ft, brt->ft->h->max_msn_in_ft, false, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
if (r == 0) {
toku_ft_lock(brt->ft);
brt->ft->h->time_of_last_verification = time(NULL);
@@ -479,4 +469,3 @@ int
toku_verify_ft (FT_HANDLE brt) {
return toku_verify_ft_with_progress(brt, NULL, NULL, 0, 0);
}
-
diff --git a/ft/ft.cc b/ft/ft.cc
index 5f48e947c61..b635d9b06c4 100644
--- a/ft/ft.cc
+++ b/ft/ft.cc
@@ -13,6 +13,7 @@
#include <memory.h>
#include <toku_assert.h>
+#include <portability/toku_atomic.h>
void
toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) {
@@ -365,6 +366,7 @@ ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that
.basementnodesize = options->basementnodesize,
.compression_method = options->compression_method,
.highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) },
+ .max_msn_in_ft = ZERO_MSN,
.time_of_last_optimize_begin = 0,
.time_of_last_optimize_end = 0,
.count_of_optimize_in_progress = 0,
@@ -850,14 +852,14 @@ toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) {
void
toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
- (void) __sync_fetch_and_add(&(headerstats->numrows), delta.numrows);
- (void) __sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
+ (void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows);
+ (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes);
}
void
toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) {
- (void) __sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
- (void) __sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
+ (void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows);
+ (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes);
}
void
diff --git a/ft/ft_layout_version.h b/ft/ft_layout_version.h
index e95abb20c01..f1ae174db87 100644
--- a/ft/ft_layout_version.h
+++ b/ft/ft_layout_version.h
@@ -28,6 +28,7 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_20 = 20, // Deadshot: Add compression method to log_fcreate,
// mgr_last_xid after begin checkpoint,
// last_xid to shutdown
+ FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
diff --git a/ft/ft_node-serialize.cc b/ft/ft_node-serialize.cc
index 298148889d7..a4e62b07649 100644
--- a/ft/ft_node-serialize.cc
+++ b/ft/ft_node-serialize.cc
@@ -7,6 +7,7 @@
#include "ft-internal.h"
#include "log-internal.h"
#include <compress.h>
+#include <portability/toku_atomic.h>
#include <util/sort.h>
#include <util/threadpool.h>
@@ -198,7 +199,7 @@ serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) {
wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8);
else
wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8);
- invariant(node->layout_version == FT_LAYOUT_VERSION);
+ paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION);
wbuf_nocrc_int(wbuf, node->layout_version);
wbuf_nocrc_int(wbuf, node->layout_version_original);
wbuf_nocrc_uint(wbuf, BUILD_ID);
@@ -226,7 +227,7 @@ static uint32_t
serialize_ftnode_partition_size (FTNODE node, int i)
{
uint32_t result = 0;
- assert(node->bp[i].state == PT_AVAIL);
+ paranoid_invariant(node->bp[i].state == PT_AVAIL);
result++; // Byte that states what the partition is
if (node->height > 0) {
result += 4; // size of bytes in buffer table
@@ -253,7 +254,7 @@ serialize_nonleaf_childinfo(NONLEAF_CHILDINFO bnc, struct wbuf *wb)
FIFO_ITERATE(
bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh,
{
- invariant((int)type>=0 && type<256);
+ paranoid_invariant((int)type>=0 && type<256);
wbuf_nocrc_char(wb, (unsigned char)type);
wbuf_nocrc_char(wb, (unsigned char)is_fresh);
wbuf_MSN(wb, msn);
@@ -382,7 +383,6 @@ static void serialize_ftnode_info(FTNODE node,
assert(sb->uncompressed_ptr == NULL);
sb->uncompressed_size = serialize_ftnode_info_size(node);
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
- assert(sb->uncompressed_ptr);
struct wbuf wb;
wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
@@ -956,7 +956,7 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
dest = &broadcast_offsets[nbroadcast_offsets];
nbroadcast_offsets++;
} else {
- assert(false);
+ abort();
}
} else {
dest = NULL;
@@ -1079,6 +1079,7 @@ NONLEAF_CHILDINFO toku_create_empty_nl(void) {
cn->fresh_message_tree.create();
cn->stale_message_tree.create();
cn->broadcast_list.create();
+ memset(cn->flow, 0, sizeof cn->flow);
return cn;
}
@@ -1089,6 +1090,7 @@ NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) {
cn->fresh_message_tree.create_no_array();
cn->stale_message_tree.create_no_array();
cn->broadcast_list.create_no_array();
+ memset(cn->flow, 0, sizeof cn->flow);
return cn;
}
@@ -1181,8 +1183,7 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
}
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
- assert(sb->uncompressed_ptr);
-
+
toku_decompress(
(Bytef *) sb->uncompressed_ptr,
sb->uncompressed_size,
@@ -1198,10 +1199,9 @@ exit:
void
just_decompress_sub_block(struct sub_block *sb)
{
- // <CER> TODO: Add assert thta the subblock was read in.
+ // <CER> TODO: Add assert that the subblock was read in.
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
- assert(sb->uncompressed_ptr);
-
+
toku_decompress(
(Bytef *) sb->uncompressed_ptr,
sb->uncompressed_size,
@@ -1259,17 +1259,15 @@ deserialize_ftnode_info(
}
// now create the basement nodes or childinfos, depending on whether this is a
- // leaf node or internal node
+ // leaf node or internal node
// now the subtree_estimates
// n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf.
- assert(node->bp!=NULL); //
// now the pivots
node->totalchildkeylens = 0;
if (node->n_children > 1) {
XMALLOC_N(node->n_children - 1, node->childkeys);
- assert(node->childkeys);
for (int i=0; i < node->n_children-1; i++) {
bytevec childkeyptr;
unsigned int cklen;
@@ -1291,13 +1289,13 @@ deserialize_ftnode_info(
for (int i = 0; i < node->n_children; i++) {
BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
BP_WORKDONE(node, i) = 0;
- }
+ }
}
-
+
// make sure that all the data was read
if (data_size != rb.ndone) {
dump_bad_block(rb.buf, rb.size);
- assert(false);
+ abort();
}
exit:
return r;
@@ -1326,7 +1324,6 @@ update_bfe_using_ftnode(FTNODE node, struct ftnode_fetch_extra *bfe)
// we can possibly require is a single basement node
// we find out what basement node the query cares about
// and check if it is available
- assert(bfe->search);
bfe->child_to_read = toku_ft_search_which_child(
&bfe->h->cmp_descriptor,
bfe->h->compare_fun,
@@ -1372,17 +1369,16 @@ setup_partitions_using_bfe(FTNODE node,
case PT_AVAIL:
setup_available_ftnode_partition(node, i);
BP_TOUCH_CLOCK(node,i);
- continue;
+ break;
case PT_COMPRESSED:
set_BSB(node, i, sub_block_creat());
- continue;
+ break;
case PT_ON_DISK:
set_BNULL(node, i);
- continue;
- case PT_INVALID:
break;
+ case PT_INVALID:
+ abort();
}
- assert(false);
}
}
@@ -1616,7 +1612,6 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// Now decompress the subblock
sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size);
- assert(sb_node_info.uncompressed_ptr);
toku_decompress(
(Bytef *) sb_node_info.uncompressed_ptr,
@@ -1638,7 +1633,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// rbuf, so we might be able to store the compressed data for some
// objects.
// We can proceed to deserialize the individual subblocks.
- assert(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch);
+ paranoid_invariant(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch);
// setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node
@@ -1647,8 +1642,8 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
if (bfe->type != ftnode_fetch_none) {
PAIR_ATTR attr;
-
-r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
+
+ r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
if (r != 0) {
goto cleanup;
}
@@ -1656,12 +1651,12 @@ r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
// handle clock
for (int i = 0; i < node->n_children; i++) {
if (toku_bfe_wants_child_available(bfe, i)) {
- assert(BP_STATE(node,i) == PT_AVAIL);
+ paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
BP_TOUCH_CLOCK(node,i);
}
}
*ftnode = node;
- r = 0; // TODO: Why do we do this???
+ r = 0;
cleanup:
if (r != 0) {
@@ -1795,7 +1790,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
// of messages in the buffer.
MSN lowest;
uint64_t amount = n_in_this_buffer;
- lowest.msn = __sync_sub_and_fetch(&bfe->h->h->highest_unused_msn_for_upgrade.msn, amount);
+ lowest.msn = toku_sync_sub_and_fetch(&bfe->h->h->highest_unused_msn_for_upgrade.msn, amount);
if (highest_msn.msn == 0) {
highest_msn.msn = lowest.msn + n_in_this_buffer;
}
@@ -1821,7 +1816,7 @@ deserialize_and_upgrade_internal_node(FTNODE node,
dest = &broadcast_offsets[nbroadcast_offsets];
nbroadcast_offsets++;
} else {
- assert(false);
+ abort();
}
} else {
dest = NULL;
@@ -1962,8 +1957,6 @@ deserialize_and_upgrade_leaf_node(FTNODE node,
if (version <= FT_LAYOUT_VERSION_13) {
// Create our mempool.
toku_mempool_construct(&bn->buffer_mempool, 0);
- OMT omt = BLB_BUFFER(node, 0);
- struct mempool *mp = &BLB_BUFFER_MEMPOOL(node, 0);
// Loop through
for (int i = 0; i < n_in_buf; ++i) {
LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]);
@@ -1975,11 +1968,11 @@ deserialize_and_upgrade_leaf_node(FTNODE node,
r = toku_le_upgrade_13_14(le,
&new_le_size,
&new_le,
- omt,
- mp);
+ &bn->buffer,
+ &bn->buffer_mempool);
assert_zero(r);
// Copy the pointer value straight into the OMT
- r = toku_omt_insert_at(omt, new_le, i);
+ r = toku_omt_insert_at(bn->buffer, new_le, i);
assert_zero(r);
bn->n_bytes_in_buffer += new_le_size;
}
@@ -2259,7 +2252,7 @@ deserialize_ftnode_from_rbuf(
// now that the node info has been deserialized, we can proceed to deserialize
// the individual sub blocks
- assert(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch);
+ paranoid_invariant(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch);
// setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node
@@ -2306,20 +2299,18 @@ deserialize_ftnode_from_rbuf(
if (r != 0) {
goto cleanup;
}
- continue;
+ break;
case PT_COMPRESSED:
// case where we leave the partition in the compressed state
r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
if (r != 0) {
goto cleanup;
}
- continue;
+ break;
case PT_INVALID: // this is really bad
case PT_ON_DISK: // it's supposed to be in memory.
- assert(0);
- continue;
+ abort();
}
- assert(0);
}
*ftnode = node;
r = 0;
@@ -2745,7 +2736,8 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s
n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead]));
// verify the number of sub blocks
- invariant(0 <= n_sub_blocks && n_sub_blocks <= max_sub_blocks);
+ invariant(0 <= n_sub_blocks);
+ invariant(n_sub_blocks <= max_sub_blocks);
{ // verify the header checksum
uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks);
@@ -2799,7 +2791,6 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s
size = node_header_overhead + uncompressed_size;
unsigned char *buf;
XMALLOC_N(size, buf);
- lazy_assert(buf);
rbuf_init(rb, buf, size);
// copy the uncompressed node header to the uncompressed buffer
@@ -2820,7 +2811,6 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s
dump_bad_block(raw_block, raw_block_size);
goto exit;
}
- lazy_assert_zero(r);
toku_trace("decompress done");
@@ -2840,7 +2830,7 @@ decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_blo
r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum);
break;
default:
- lazy_assert(false);
+ abort();
}
return r;
}
@@ -2886,7 +2876,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
fprintf(stderr,
"Checksum failure while reading raw block in file %s.\n",
toku_cachefile_fname_in_env(h->cf));
- assert(false);
+ abort();
} else {
r = toku_db_badformat();
goto cleanup;
@@ -2949,7 +2939,6 @@ cleanup:
return r;
}
-
int
toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h)
{
@@ -2962,7 +2951,7 @@ toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h)
struct ftnode_fetch_extra bfe;
fill_bfe_for_min_read(&bfe, h);
r = deserialize_ftnode_from_fd(fd, h->h->root_blocknum, 0, &unused_node, &unused_ndd,
- &bfe, &h->h->on_disk_stats);
+ &bfe, &h->h->on_disk_stats);
h->in_memory_stats = h->h->on_disk_stats;
if (unused_node) {
@@ -2974,5 +2963,27 @@ toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h)
return r;
}
-#undef UPGRADE_STATUS_VALUE
+int
+toku_upgrade_msn_from_root_to_header(int fd, FT h)
+{
+ int r;
+ // 21 was the first version with max_msn_in_ft in the header
+ invariant(h->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20);
+ FTNODE node;
+ FTNODE_DISK_DATA ndd;
+ struct ftnode_fetch_extra bfe;
+ fill_bfe_for_min_read(&bfe, h);
+ r = deserialize_ftnode_from_fd(fd, h->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr);
+ if (r != 0) {
+ goto exit;
+ }
+
+ h->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk;
+ toku_ftnode_free(&node);
+ toku_free(ndd);
+ exit:
+ return r;
+}
+
+#undef UPGRADE_STATUS_VALUE
diff --git a/ft/fttypes.h b/ft/fttypes.h
index 555d4764b3a..f3182990924 100644
--- a/ft/fttypes.h
+++ b/ft/fttypes.h
@@ -281,6 +281,10 @@ enum reactivity {
RE_FISSIBLE
};
+enum split_mode {
+ SPLIT_EVENLY,
+ SPLIT_LEFT_HEAVY,
+ SPLIT_RIGHT_HEAVY
+};
#endif
-
diff --git a/ft/leafentry.h b/ft/leafentry.h
index 6a645e0a074..33788255c5b 100644
--- a/ft/leafentry.h
+++ b/ft/leafentry.h
@@ -175,12 +175,12 @@ int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *
size_t
leafentry_disksize_13(LEAFENTRY_13 le);
-int
+int
toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data.
- size_t *new_leafentry_memorysize,
- LEAFENTRY *new_leafentry_p,
- OMT omt,
- struct mempool *mp);
+ size_t *new_leafentry_memorysize,
+ LEAFENTRY *new_leafentry_p,
+ OMT *omtp,
+ struct mempool *mp);
diff --git a/ft/locking-benchmarks/mfence-benchmark.cc b/ft/locking-benchmarks/mfence-benchmark.cc
index 21b46c519bf..8e8e0cfccd1 100644
--- a/ft/locking-benchmarks/mfence-benchmark.cc
+++ b/ft/locking-benchmarks/mfence-benchmark.cc
@@ -34,6 +34,7 @@ lfence: 12.9ns/loop (marginal cost= -0.1ns)
#include <sys/time.h>
#include <stdio.h>
+#include <portability/toku_atomic.h>
enum { COUNT = 100000000 };
@@ -67,8 +68,8 @@ static inline void sfence (void) {
int lock_for_lock_and_unlock;
static inline void lock_and_unlock (void) {
- (void)__sync_lock_test_and_set(&lock_for_lock_and_unlock, 1);
- __sync_lock_release(&lock_for_lock_and_unlock);
+ (void)toku_sync_lock_test_and_set(&lock_for_lock_and_unlock, 1);
+ toku_sync_lock_release(&lock_for_lock_and_unlock);
}
diff --git a/ft/locking-benchmarks/pthread-locks.cc b/ft/locking-benchmarks/pthread-locks.cc
index 626af6054fc..8dc94fd89a1 100644
--- a/ft/locking-benchmarks/pthread-locks.cc
+++ b/ft/locking-benchmarks/pthread-locks.cc
@@ -13,6 +13,7 @@
#include <stdio.h>
#include <sys/time.h>
#include <pthread.h>
+#include <portability/toku_atomic.h>
float tdiff (struct timeval *start, struct timeval *end) {
return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec);
@@ -71,13 +72,13 @@ fetch_and_add_i (volatile int *p, int incr)
static inline int
gcc_fetch_and_add_i (volatile int *p, int incr)
{
- return __sync_fetch_and_add(p, incr);
+ return toku_sync_fetch_and_add(p, incr);
}
static inline long
gcc_fetch_and_add_l (volatile long *p, long incr)
{
- return __sync_fetch_and_add(p, incr);
+ return toku_sync_fetch_and_add(p, incr);
}
// Something wrong with the compiler for longs
diff --git a/ft/locking-benchmarks/trylock-rdtsc.cc b/ft/locking-benchmarks/trylock-rdtsc.cc
index 1495570939d..707d07f7a82 100644
--- a/ft/locking-benchmarks/trylock-rdtsc.cc
+++ b/ft/locking-benchmarks/trylock-rdtsc.cc
@@ -13,6 +13,7 @@
#include <sys/time.h>
#include <unistd.h>
#include <rdtsc.h>
+#include <portability/toku_atomic.h>
float tdiff (struct timeval *start, struct timeval *end) {
return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec);
@@ -135,12 +136,12 @@ int main(int argc __attribute__((unused)), char **argv)
{
static int lock_for_lock_and_unlock;
t_start = rdtsc();
- (void)__sync_lock_test_and_set(&lock_for_lock_and_unlock, 1);
+ (void)toku_sync_lock_test_and_set(&lock_for_lock_and_unlock, 1);
t_end = rdtsc();
printf("sync_lock_test_and_set took %llu clocks\n", t_end-t_start);
t_start = rdtsc();
- __sync_lock_release(&lock_for_lock_and_unlock);
+ toku_sync_lock_release(&lock_for_lock_and_unlock);
t_end = rdtsc();
printf("sync_lock_release took %llu clocks\n", t_end-t_start);
}
@@ -148,7 +149,7 @@ int main(int argc __attribute__((unused)), char **argv)
{
t_start = rdtsc();
- (void)__sync_synchronize();
+ (void)toku_sync_synchronize();
t_end = rdtsc();
printf("sync_synchornize took %llu clocks\n", t_end-t_start);
}
diff --git a/ft/rollback-ct-callbacks.h b/ft/rollback-ct-callbacks.h
index cead2900aab..09c4c3757f4 100644
--- a/ft/rollback-ct-callbacks.h
+++ b/ft/rollback-ct-callbacks.h
@@ -44,6 +44,7 @@ static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT
wc.pe_callback = toku_rollback_pe_callback;
wc.cleaner_callback = toku_rollback_cleaner_callback;
wc.clone_callback = toku_rollback_clone_callback;
+ wc.checkpoint_complete_callback = nullptr;
wc.write_extraargs = h;
return wc;
}
diff --git a/ft/rollback.cc b/ft/rollback.cc
index 7f173dc000a..b0ac13b51ad 100644
--- a/ft/rollback.cc
+++ b/ft/rollback.cc
@@ -84,7 +84,7 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
log->layout_version_read_from_disk = FT_LAYOUT_VERSION;
log->dirty = true;
log->sequence = 0;
- log->previous = {0};
+ log->previous = make_blocknum(0);
log->previous_hash = 0;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
diff --git a/ft/tests/cachetable-checkpoint-pending.cc b/ft/tests/cachetable-checkpoint-pending.cc
index 7a1a08c7af9..35a0f0ff06d 100644
--- a/ft/tests/cachetable-checkpoint-pending.cc
+++ b/ft/tests/cachetable-checkpoint-pending.cc
@@ -9,6 +9,7 @@
#include <unistd.h>
#include "cachetable-test.h"
#include "checkpoint.h"
+#include <portability/toku_atomic.h>
static int N; // how many items in the table
static CACHEFILE cf;
@@ -54,9 +55,9 @@ flush (
int *CAST_FROM_VOIDP(v, value);
if (*v!=expect_value) printf("got %d expect %d\n", *v, expect_value);
assert(*v==expect_value);
- (void)__sync_fetch_and_add(&n_flush, 1);
- if (write_me) (void)__sync_fetch_and_add(&n_write_me, 1);
- if (keep_me) (void)__sync_fetch_and_add(&n_keep_me, 1);
+ (void)toku_sync_fetch_and_add(&n_flush, 1);
+ if (write_me) (void)toku_sync_fetch_and_add(&n_write_me, 1);
+ if (keep_me) (void)toku_sync_fetch_and_add(&n_keep_me, 1);
sleep_random();
}
diff --git a/ft/tests/cachetable-checkpoint-test.cc b/ft/tests/cachetable-checkpoint-test.cc
index b565cd8660a..0deeb9e7621 100644
--- a/ft/tests/cachetable-checkpoint-test.cc
+++ b/ft/tests/cachetable-checkpoint-test.cc
@@ -106,7 +106,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
CACHEKEY key = make_blocknum(i);
uint32_t hi = toku_cachetable_hash(f1, key);
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, key, hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, key, hi, PL_WRITE_EXPENSIVE, &v);
if (r != 0)
continue;
r = toku_test_cachetable_unpin(f1, key, hi, CACHETABLE_CLEAN, make_pair_attr(item_size));
diff --git a/ft/tests/cachetable-checkpointer-class.cc b/ft/tests/cachetable-checkpointer-class.cc
index 931858aa278..0a8d0e5d592 100644
--- a/ft/tests/cachetable-checkpointer-class.cc
+++ b/ft/tests/cachetable-checkpointer-class.cc
@@ -193,6 +193,7 @@ void checkpointer_test::add_pairs(struct cachefile *cf,
attr.cache_pressure_size = 0;
attr.is_valid = true;
CACHETABLE_WRITE_CALLBACK cb;
+ ZERO_STRUCT(cb); // All nullptr
for (uint32_t i = k; i < count + k; ++i) {
CACHEKEY key;
@@ -201,12 +202,12 @@ void checkpointer_test::add_pairs(struct cachefile *cf,
pair_init(&(pairs[i]),
cf,
key,
- NULL,
+ nullptr,
attr,
CACHETABLE_CLEAN,
full_hash,
cb,
- NULL,
+ nullptr,
m_cp.m_list);
m_cp.m_list->put(&pairs[i]);
diff --git a/ft/tests/cachetable-count-pinned-test.cc b/ft/tests/cachetable-count-pinned-test.cc
index 99729c93a58..0c112151e08 100644
--- a/ft/tests/cachetable-count-pinned-test.cc
+++ b/ft/tests/cachetable-count-pinned-test.cc
@@ -27,7 +27,8 @@ cachetable_count_pinned_test (int n) {
assert(toku_cachefile_count_pinned(f1, 0) == i);
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
+
assert(r == -1);
assert(toku_cachefile_count_pinned(f1, 0) == i);
diff --git a/ft/tests/cachetable-flush-test.cc b/ft/tests/cachetable-flush-test.cc
index a3dc1a6f5b8..be66b3cc491 100644
--- a/ft/tests/cachetable-flush-test.cc
+++ b/ft/tests/cachetable-flush-test.cc
@@ -43,12 +43,12 @@ test_cachetable_def_flush (int n) {
uint32_t hi;
void *v;
hi = toku_cachetable_hash(f1, make_blocknum(i));
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
assert(r == 0 && v == (void *)(long)i);
r = toku_test_cachetable_unpin(f1, make_blocknum(i), hi, CACHETABLE_CLEAN, make_pair_attr(1));
assert(r == 0);
hi = toku_cachetable_hash(f2, make_blocknum(i));
- r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
assert(r == 0 && v == (void *)(long)i);
r = toku_test_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, make_pair_attr(1));
assert(r == 0);
@@ -63,10 +63,10 @@ test_cachetable_def_flush (int n) {
uint32_t hi;
void *v;
hi = toku_cachetable_hash(f1, make_blocknum(i));
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
assert(r != 0);
hi = toku_cachetable_hash(f2, make_blocknum(i));
- r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
assert(r == 0);
r = toku_test_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, make_pair_attr(1));
assert(r == 0);
diff --git a/ft/tests/cachetable-prefetch-checkpoint-test.cc b/ft/tests/cachetable-prefetch-checkpoint-test.cc
index 01b42a56843..cde523b5757 100644
--- a/ft/tests/cachetable-prefetch-checkpoint-test.cc
+++ b/ft/tests/cachetable-prefetch-checkpoint-test.cc
@@ -122,7 +122,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
CACHEKEY key = make_blocknum(i);
uint32_t hi = toku_cachetable_hash(f1, key);
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, key, hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, key, hi, PL_WRITE_EXPENSIVE, &v);
if (r != 0)
continue;
r = toku_test_cachetable_unpin(f1, key, hi, CACHETABLE_CLEAN, make_pair_attr(item_size));
diff --git a/ft/tests/cachetable-prefetch-maybegetandpin-test.cc b/ft/tests/cachetable-prefetch-maybegetandpin-test.cc
index df52ab50047..128c771a832 100644
--- a/ft/tests/cachetable-prefetch-maybegetandpin-test.cc
+++ b/ft/tests/cachetable-prefetch-maybegetandpin-test.cc
@@ -51,7 +51,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
int i;
for (i=1; i>=0; i++) {
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, PL_WRITE_EXPENSIVE, &v);
if (r == 0) break;
toku_pthread_yield();
}
diff --git a/ft/tests/cachetable-prefetch2-test.cc b/ft/tests/cachetable-prefetch2-test.cc
index 09d91e20cf8..6e3670a1a89 100644
--- a/ft/tests/cachetable-prefetch2-test.cc
+++ b/ft/tests/cachetable-prefetch2-test.cc
@@ -58,7 +58,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
int i;
for (i=1; i>=0; i++) {
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, PL_WRITE_EXPENSIVE, &v);
if (r == 0) break;
toku_pthread_yield();
}
diff --git a/ft/tests/cachetable-put-test.cc b/ft/tests/cachetable-put-test.cc
index 64fa16d5520..77f8cc83b04 100644
--- a/ft/tests/cachetable-put-test.cc
+++ b/ft/tests/cachetable-put-test.cc
@@ -26,7 +26,7 @@ cachetable_put_test (int n) {
assert(toku_cachefile_count_pinned(f1, 0) == i);
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
assert(r == -1);
assert(toku_cachefile_count_pinned(f1, 0) == i);
diff --git a/ft/tests/cachetable-simple-maybe-get-pin.cc b/ft/tests/cachetable-simple-maybe-get-pin.cc
index 7b7216fd2bc..11feeffe838 100644
--- a/ft/tests/cachetable-simple-maybe-get-pin.cc
+++ b/ft/tests/cachetable-simple-maybe-get-pin.cc
@@ -26,37 +26,37 @@ cachetable_test (void) {
void* v1;
long s1;
// nothing in cachetable, so this should fail
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==-1);
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8));
// maybe_get_and_pin_clean should succeed, maybe_get_and_pin should fail
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==-1);
- r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r == 0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8));
// maybe_get_and_pin_clean should succeed, maybe_get_and_pin should fail
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==0);
// now these calls should fail because the node is already pinned, and therefore in use
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==-1);
- r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==-1);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8));
// sanity check, this should still succeed, because the PAIR is dirty
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==0);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8));
CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct);
toku_cachetable_begin_checkpoint(cp, NULL);
// now these should fail, because the node should be pending a checkpoint
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==-1);
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1);
assert(r==-1);
toku_cachetable_end_checkpoint(
cp,
diff --git a/ft/tests/cachetable-test.cc b/ft/tests/cachetable-test.cc
index c4c4014860c..e0ed9a3518c 100644
--- a/ft/tests/cachetable-test.cc
+++ b/ft/tests/cachetable-test.cc
@@ -105,7 +105,7 @@ static void test_nested_pin (void) {
assert(i0==0);
r = toku_test_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, make_pair_attr(test_object_size));
assert(r==0);
- r = toku_cachetable_maybe_get_and_pin(f, make_blocknum(1), f1hash, &vv2);
+ r = toku_cachetable_maybe_get_and_pin(f, make_blocknum(1), f1hash, PL_WRITE_EXPENSIVE, &vv2);
assert(r==0);
assert(vv2==vv);
r = toku_test_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, make_pair_attr(test_object_size));
diff --git a/ft/tests/cachetable-unpin-and-remove-test.cc b/ft/tests/cachetable-unpin-and-remove-test.cc
index 8a76a0764c5..a490e62b730 100644
--- a/ft/tests/cachetable-unpin-and-remove-test.cc
+++ b/ft/tests/cachetable-unpin-and-remove-test.cc
@@ -63,7 +63,7 @@ cachetable_unpin_and_remove_test (int n) {
// verify that k is removed
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(testkeys[i].b), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(testkeys[i].b), hi, PL_WRITE_EXPENSIVE, &v);
assert(r != 0);
testkeys[i] = testkeys[nkeys-1]; nkeys -= 1;
diff --git a/ft/tests/cachetable-unpin-test.cc b/ft/tests/cachetable-unpin-test.cc
index e3f48887cc2..8a561dd5e03 100644
--- a/ft/tests/cachetable-unpin-test.cc
+++ b/ft/tests/cachetable-unpin-test.cc
@@ -27,7 +27,7 @@ cachetable_unpin_test (int n) {
assert(toku_cachefile_count_pinned(f1, 0) == i);
void *v;
- r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v);
+ r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v);
assert(r == -1);
assert(toku_cachefile_count_pinned(f1, 0) == i);
diff --git a/ft/tests/cachetable-writer-thread-limit.cc b/ft/tests/cachetable-writer-thread-limit.cc
index 03976181a22..557243b7e64 100644
--- a/ft/tests/cachetable-writer-thread-limit.cc
+++ b/ft/tests/cachetable-writer-thread-limit.cc
@@ -4,6 +4,7 @@
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "includes.h"
#include "test.h"
+#include <portability/toku_atomic.h>
static int total_size;
@@ -25,7 +26,7 @@ flush (CACHEFILE f __attribute__((__unused__)),
bool UU(is_clone)
) {
if (w) {
- int curr_size = __sync_fetch_and_sub(&total_size, 1);
+ int curr_size = toku_sync_fetch_and_sub(&total_size, 1);
assert(curr_size <= 200);
usleep(500*1000);
}
@@ -49,7 +50,7 @@ cachetable_test (void) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
wc.flush_callback = flush;
toku_cachetable_put(f1, make_blocknum(i), i, NULL, make_pair_attr(1), wc, put_callback_nop);
- int curr_size = __sync_fetch_and_add(&total_size, 1);
+ int curr_size = toku_sync_fetch_and_add(&total_size, 1);
assert(curr_size <= test_limit + test_limit/2+1);
r = toku_test_cachetable_unpin(f1, make_blocknum(i), i, CACHETABLE_DIRTY, make_pair_attr(4));
}
diff --git a/ft/tests/ftloader-error-injector.h b/ft/tests/ftloader-error-injector.h
index 9f32320fee9..e49407a1af5 100644
--- a/ft/tests/ftloader-error-injector.h
+++ b/ft/tests/ftloader-error-injector.h
@@ -7,6 +7,7 @@
#ifndef FTLOADER_ERROR_INJECTOR_H
#define FTLOADER_ERROR_INJECTOR_H
+#include <portability/toku_atomic.h>
static toku_mutex_t event_mutex = TOKU_MUTEX_INITIALIZER;
static void lock_events(void) {
@@ -107,9 +108,9 @@ static void reset_my_malloc_counts(void) {
__attribute__((__unused__))
static void *my_malloc(size_t n) {
- (void) __sync_fetch_and_add(&my_malloc_count, 1); // my_malloc_count++;
+ (void) toku_sync_fetch_and_add(&my_malloc_count, 1); // my_malloc_count++;
if (n >= my_big_malloc_limit) {
- (void) __sync_fetch_and_add(&my_big_malloc_count, 1); // my_big_malloc_count++;
+ (void) toku_sync_fetch_and_add(&my_big_malloc_count, 1); // my_big_malloc_count++;
if (do_malloc_errors) {
if (event_add_and_fetch() == event_count_trigger) {
event_hit();
@@ -125,9 +126,9 @@ static int do_realloc_errors = 0;
__attribute__((__unused__))
static void *my_realloc(void *p, size_t n) {
- (void) __sync_fetch_and_add(&my_realloc_count, 1); // my_realloc_count++;
+ (void) toku_sync_fetch_and_add(&my_realloc_count, 1); // my_realloc_count++;
if (n >= my_big_malloc_limit) {
- (void) __sync_fetch_and_add(&my_big_realloc_count, 1); // my_big_realloc_count++;
+ (void) toku_sync_fetch_and_add(&my_big_realloc_count, 1); // my_big_realloc_count++;
if (do_realloc_errors) {
if (event_add_and_fetch() == event_count_trigger) {
event_hit();
diff --git a/ft/tests/keyrange.cc b/ft/tests/keyrange.cc
index 7e277946f9d..603f5a9a320 100644
--- a/ft/tests/keyrange.cc
+++ b/ft/tests/keyrange.cc
@@ -82,8 +82,8 @@ static void test_keyrange (enum memory_state ms, uint64_t limit) {
struct ftstat64_s s;
toku_ft_handle_stat64(t, null_txn, &s);
- assert(0 < s.nkeys && s.nkeys < limit);
- assert(0 < s.dsize && s.dsize < limit * (9 + 9)); // keylen = 9, vallen = 9
+ assert(0 < s.nkeys && s.nkeys <= limit);
+ assert(0 < s.dsize && s.dsize <= limit * (9 + 9)); // keylen = 9, vallen = 9
}
maybe_reopen(ms, limit);
diff --git a/ft/tests/make-tree.cc b/ft/tests/make-tree.cc
index ea8fc8051d8..acafd2f8e1e 100644
--- a/ft/tests/make-tree.cc
+++ b/ft/tests/make-tree.cc
@@ -138,7 +138,7 @@ test_make_tree(int height, int fanout, int nperleaf, int do_verify) {
// set the new root to point to the new tree
toku_ft_set_new_root_blocknum(brt->ft, newroot->thisnodename);
- newroot->max_msn_applied_to_node_on_disk = last_dummymsn(); // capture msn of last message injected into tree
+ brt->ft->h->max_msn_in_ft = last_dummymsn(); // capture msn of last message injected into tree
// unpin the new root
toku_unpin_ftnode(brt->ft, newroot);
diff --git a/ft/tests/msnfilter.cc b/ft/tests/msnfilter.cc
index 43c9bf7ce6c..2904714a366 100644
--- a/ft/tests/msnfilter.cc
+++ b/ft/tests/msnfilter.cc
@@ -45,10 +45,10 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// apply an insert to the leaf node
MSN msn = next_dummymsn();
+ brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
- uint64_t workdone=0;
- toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &cmd, &workdone, NULL);
+ toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, nullptr, nullptr);
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
@@ -56,9 +56,8 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
}
FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} };
- toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &badcmd, &workdone, NULL);
+ toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, nullptr, nullptr);
-
// message should be rejected for duplicate msn, row should still have original val
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
@@ -68,9 +67,10 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// now verify that message with proper msn gets through
msn = next_dummymsn();
+ brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} };
- toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &cmd2, &workdone, NULL);
-
+ toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, nullptr, nullptr);
+
// message should be accepted, val should have new value
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair2);
@@ -81,8 +81,8 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }};
- toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &cmd3, &workdone, NULL);
-
+ toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, nullptr, nullptr);
+
// message should be rejected, val should still have value in pair2
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair2);
diff --git a/ft/tests/orthopush-flush.cc b/ft/tests/orthopush-flush.cc
index 19a9a63aa4d..7e45b173c1b 100644
--- a/ft/tests/orthopush-flush.cc
+++ b/ft/tests/orthopush-flush.cc
@@ -580,7 +580,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
- toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, parent_messages[i], NULL, NULL);
+ toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
@@ -803,7 +803,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) {
- toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, parent_messages[i], NULL, NULL);
+ toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
@@ -995,8 +995,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
- toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, parent_messages[i], NULL, NULL);
- toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, parent_messages[i], NULL, NULL);
+ toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], NULL, NULL);
+ toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
diff --git a/ft/tests/test.h b/ft/tests/test.h
index b3b9aa21de5..71b655cfcd0 100644
--- a/ft/tests/test.h
+++ b/ft/tests/test.h
@@ -228,7 +228,8 @@ static UU() CACHETABLE_WRITE_CALLBACK def_write_callback(void* write_extraargs)
wc.pe_callback = def_pe_callback;
wc.cleaner_callback = def_cleaner_callback;
wc.write_extraargs = write_extraargs;
- wc.clone_callback = NULL;
+ wc.clone_callback = nullptr;
+ wc.checkpoint_complete_callback = nullptr;
return wc;
}
diff --git a/ft/tests/test3884.cc b/ft/tests/test3884.cc
index 920d3993916..966a8f90cac 100644
--- a/ft/tests/test3884.cc
+++ b/ft/tests/test3884.cc
@@ -171,7 +171,7 @@ test_split_on_boundary(void)
FTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
- ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL);
+ ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL);
verify_basement_node_msns(nodea, dummy_msn_3884);
verify_basement_node_msns(nodeb, dummy_msn_3884);
@@ -244,7 +244,7 @@ test_split_with_everything_on_the_left(void)
FTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
- ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL);
+ ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL);
toku_unpin_ftnode(brt->ft, nodeb);
r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0);
@@ -319,7 +319,7 @@ test_split_on_boundary_of_last_node(void)
FTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
- ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL);
+ ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL);
toku_unpin_ftnode(brt->ft, nodeb);
r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0);
@@ -387,7 +387,7 @@ test_split_at_begin(void)
FTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
- ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL);
+ ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL);
toku_unpin_ftnode(brt->ft, nodeb);
r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0);
@@ -451,7 +451,7 @@ test_split_at_end(void)
FTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
- ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL);
+ ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL);
toku_unpin_ftnode(brt->ft, nodeb);
r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0);
@@ -505,7 +505,7 @@ test_split_odd_nodes(void)
FTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
- ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL);
+ ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL);
verify_basement_node_msns(nodea, dummy_msn_3884);
verify_basement_node_msns(nodeb, dummy_msn_3884);
diff --git a/ft/ule-internal.h b/ft/ule-internal.h
index 5a023875602..db0cebd4bd3 100644
--- a/ft/ule-internal.h
+++ b/ft/ule-internal.h
@@ -60,10 +60,10 @@ void test_msg_modify_ule(ULE ule, FT_MSG msg);
//Functions exported for test purposes only (used internally for non-test purposes).
void le_unpack(ULE ule, LEAFENTRY le);
int le_pack(ULE ule, // data to be packed into new leafentry
- size_t *new_leafentry_memorysize,
+ size_t *new_leafentry_memorysize,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
- OMT omt,
- struct mempool *mp,
+ OMT *omtp,
+ struct mempool *mp,
void **maybe_free);
diff --git a/ft/ule.cc b/ft/ule.cc
index 565a9c1670e..3aeb6ed26e1 100644
--- a/ft/ule.cc
+++ b/ft/ule.cc
@@ -154,11 +154,11 @@ static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p);
static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p);
static void *
-le_malloc(OMT omt, struct mempool *mp, size_t size, void **maybe_free)
+le_malloc(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free)
{
void * rval;
- if (omt)
- rval = mempool_malloc_from_omt(omt, mp, size, maybe_free);
+ if (omtp)
+ rval = mempool_malloc_from_omt(omtp, mp, size, maybe_free);
else
rval = toku_xmalloc(size);
resource_assert(rval);
@@ -317,14 +317,14 @@ done:;
// If the leafentry is destroyed it sets *new_leafentry_p to NULL.
// Otehrwise the new_leafentry_p points at the new leaf entry.
// As of October 2011, this function always returns 0.
-int
+int
apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
- LEAFENTRY old_leafentry, // NULL if there was no stored data.
- size_t *new_leafentry_memorysize,
- LEAFENTRY *new_leafentry_p,
- OMT omt,
- struct mempool *mp,
- void **maybe_free,
+ LEAFENTRY old_leafentry, // NULL if there was no stored data.
+ size_t *new_leafentry_memorysize,
+ LEAFENTRY *new_leafentry_p,
+ OMT *omtp,
+ struct mempool *mp,
+ void **maybe_free,
int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
ULE_S ule;
int rval;
@@ -334,17 +334,17 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
if (old_leafentry == NULL) // if leafentry does not exist ...
msg_init_empty_ule(&ule, msg); // ... create empty unpacked leaf entry
else {
- le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
+ le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
oldnumbytes = ule_get_innermost_numbytes(&ule);
}
msg_modify_ule(&ule, msg); // modify unpacked leafentry
rval = le_pack(&ule, // create packed leafentry
- new_leafentry_memorysize,
- new_leafentry_p,
- omt,
- mp,
- maybe_free);
- if (new_leafentry_p)
+ new_leafentry_memorysize,
+ new_leafentry_p,
+ omtp,
+ mp,
+ maybe_free);
+ if (new_leafentry_p)
newnumbytes = ule_get_innermost_numbytes(&ule);
*numbytes_delta_p = newnumbytes - oldnumbytes;
ule_cleanup(&ule);
@@ -374,7 +374,7 @@ int
garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
- OMT omt,
+ OMT *omtp,
struct mempool *mp,
void **maybe_free,
const xid_omt_t &snapshot_xids,
@@ -387,7 +387,7 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
r = le_pack(&ule,
new_leaf_entry_memory_size,
new_leaf_entry,
- omt,
+ omtp,
mp,
maybe_free);
assert(r == 0);
@@ -713,8 +713,8 @@ int
le_pack(ULE ule, // data to be packed into new leafentry
size_t *new_leafentry_memorysize,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
- OMT omt,
- struct mempool *mp,
+ OMT *omtp,
+ struct mempool *mp,
void **maybe_free)
{
invariant(ule->num_cuxrs > 0);
@@ -740,7 +740,7 @@ le_pack(ULE ule, // data to be packed into new leafen
found_insert:;
memsize = le_memsize_from_ule(ule);
LEAFENTRY new_leafentry;
- CAST_FROM_VOIDP(new_leafentry, le_malloc(omt, mp, memsize, maybe_free));
+ CAST_FROM_VOIDP(new_leafentry, le_malloc(omtp, mp, memsize, maybe_free));
//Universal data
new_leafentry->keylen = toku_htod32(ule->keylen);
@@ -2293,7 +2293,7 @@ int
toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry,
size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p,
- OMT omt,
+ OMT *omtp,
struct mempool *mp) {
ULE_S ule;
int rval;
@@ -2305,7 +2305,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry,
rval = le_pack(&ule, // create packed leafentry
new_leafentry_memorysize,
new_leafentry_p,
- omt, mp, NULL);
+ omtp, mp, NULL);
ule_cleanup(&ule);
return rval;
}
diff --git a/ft/ule.h b/ft/ule.h
index 4ac84f3f82a..d3b3a719500 100644
--- a/ft/ule.h
+++ b/ft/ule.h
@@ -53,18 +53,18 @@ void fast_msg_to_leafentry(
LEAFENTRY *new_leafentry_p) ;
int apply_msg_to_leafentry(FT_MSG msg,
- LEAFENTRY old_leafentry, // NULL if there was no stored data.
- size_t *new_leafentry_memorysize,
- LEAFENTRY *new_leafentry_p,
- OMT omt,
- struct mempool *mp,
- void **maybe_free,
+ LEAFENTRY old_leafentry, // NULL if there was no stored data.
+ size_t *new_leafentry_memorysize,
+ LEAFENTRY *new_leafentry_p,
+ OMT *omtp,
+ struct mempool *mp,
+ void **maybe_free,
int64_t * numbytes_delta_p);
int garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
- OMT omt,
+ OMT *omtp,
struct mempool *mp,
void **maybe_free,
const xid_omt_t &snapshot_xids,
diff --git a/ft/worker-thread-benchmarks/threadpool.cc b/ft/worker-thread-benchmarks/threadpool.cc
index 0ac6ffb5b2c..c9e4d9bcb35 100644
--- a/ft/worker-thread-benchmarks/threadpool.cc
+++ b/ft/worker-thread-benchmarks/threadpool.cc
@@ -11,6 +11,7 @@
#include <errno.h>
#include "threadpool.h"
+#include <portability/toku_atomic.h>
// use gcc builtin fetch_and_add 0->no 1->yes
#define DO_ATOMIC_FETCH_AND_ADD 0
@@ -61,7 +62,7 @@ void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg)
void threadpool_set_thread_busy(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
- (void) __sync_fetch_and_add(&threadpool->busy_threads, 1);
+ (void) toku_sync_fetch_and_add(&threadpool->busy_threads, 1);
#else
threadpool->busy_threads++;
#endif
@@ -69,7 +70,7 @@ void threadpool_set_thread_busy(THREADPOOL threadpool) {
void threadpool_set_thread_idle(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
- (void) __sync_fetch_and_add(&threadpool->busy_threads, -1);
+ (void) toku_sync_fetch_and_add(&threadpool->busy_threads, -1);
#else
threadpool->busy_threads--;
#endif