diff options
author | Leif Walsh <leif@tokutek.com> | 2012-11-07 21:35:11 +0000 |
---|---|---|
committer | Yoni Fogel <yoni@tokutek.com> | 2013-04-17 00:01:14 -0400 |
commit | 10f6b5c79fa7d3f7c49617fcf904efba1513e213 (patch) | |
tree | 1194fa915c5aea8a7a299f24cf91e7afb229389e /ft | |
parent | 3571ca4bb94a7fbdff9688f24b5a8be1e7369500 (diff) | |
download | mariadb-git-10f6b5c79fa7d3f7c49617fcf904efba1513e213.tar.gz |
refs #5418 merge promotion to main
git-svn-id: file:///svn/toku/tokudb@49697 c7de825b-a66e-492c-adef-691d508d4ae1
Diffstat (limited to 'ft')
52 files changed, 1412 insertions, 777 deletions
diff --git a/ft/block_table.cc b/ft/block_table.cc index f9583902357..377283ab958 100644 --- a/ft/block_table.cc +++ b/ft/block_table.cc @@ -89,9 +89,9 @@ static inline void unlock_for_blocktable (BLOCK_TABLE bt); static void ft_set_dirty(FT ft, bool for_checkpoint){ toku_mutex_assert_locked(&ft->blocktable->mutex); - assert(ft->h->type == FT_CURRENT); + paranoid_invariant(ft->h->type == FT_CURRENT); if (for_checkpoint) { - assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS); + paranoid_invariant(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS); ft->checkpoint_header->dirty = 1; } else { @@ -134,10 +134,10 @@ toku_maybe_truncate_file_on_open(BLOCK_TABLE bt, int fd) { static void copy_translation(struct translation * dst, struct translation * src, enum translation_type newtype) { - assert(src->length_of_array >= src->smallest_never_used_blocknum.b); //verify invariant - assert(newtype==TRANSLATION_DEBUG || - (src->type == TRANSLATION_CURRENT && newtype == TRANSLATION_INPROGRESS) || - (src->type == TRANSLATION_CHECKPOINTED && newtype == TRANSLATION_CURRENT)); + paranoid_invariant(src->length_of_array >= src->smallest_never_used_blocknum.b); //verify invariant + paranoid_invariant(newtype==TRANSLATION_DEBUG || + (src->type == TRANSLATION_CURRENT && newtype == TRANSLATION_INPROGRESS) || + (src->type == TRANSLATION_CHECKPOINTED && newtype == TRANSLATION_CURRENT)); dst->type = newtype; dst->smallest_never_used_blocknum = src->smallest_never_used_blocknum; dst->blocknum_freelist_head = src->blocknum_freelist_head; @@ -175,7 +175,7 @@ maybe_optimize_translation(struct translation *t) { //This is O(n) work, so do it only if you're already doing that. BLOCKNUM b; - assert(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS); + paranoid_invariant(t->smallest_never_used_blocknum.b >= RESERVED_BLOCKNUMS); //Calculate how large the free suffix is. int64_t freed; { @@ -212,7 +212,7 @@ void toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) { toku_mutex_assert_locked(&bt->mutex); // Copy current translation to inprogress translation. - assert(bt->inprogress.block_translation == NULL); + paranoid_invariant(bt->inprogress.block_translation == NULL); //We're going to do O(n) work to copy the translation, so we //can afford to do O(n) work by optimizing the translation maybe_optimize_translation(&bt->current); @@ -229,7 +229,7 @@ toku_block_translation_note_start_checkpoint_unlocked (BLOCK_TABLE bt) { void toku_block_translation_note_skipped_checkpoint (BLOCK_TABLE bt) { //Purpose, alert block translation that the checkpoint was skipped, e.x. for a non-dirty header lock_for_blocktable(bt); - assert(bt->inprogress.block_translation); + paranoid_invariant_notnull(bt->inprogress.block_translation); bt->checkpoint_skipped = true; unlock_for_blocktable(bt); } @@ -267,7 +267,7 @@ toku_block_translation_note_end_checkpoint (BLOCK_TABLE bt, int fd) { // Free unused blocks lock_for_blocktable(bt); uint64_t allocated_limit_at_start = block_allocator_allocated_limit(bt->block_allocator); - assert(bt->inprogress.block_translation); + paranoid_invariant_notnull(bt->inprogress.block_translation); if (bt->checkpoint_skipped || bt->checkpoint_failed) { cleanup_failed_checkpoint(bt); goto end; @@ -299,25 +299,31 @@ end: unlock_for_blocktable(bt); } - +__attribute__((nonnull,const)) +static inline bool +is_valid_blocknum(struct translation *t, BLOCKNUM b) { + //Sanity check: Verify invariant + paranoid_invariant(t->length_of_array >= t->smallest_never_used_blocknum.b); + return b.b >= 0 && b.b < t->smallest_never_used_blocknum.b; +} static inline void -verify_valid_blocknum (struct translation *t, BLOCKNUM b) { - assert(b.b >= 0); - assert(b.b < t->smallest_never_used_blocknum.b); +verify_valid_blocknum (struct translation *UU(t), BLOCKNUM UU(b)) { + paranoid_invariant(is_valid_blocknum(t, b)); +} +__attribute__((nonnull,const)) +static inline bool +is_valid_freeable_blocknum(struct translation *t, BLOCKNUM b) { //Sanity check: Verify invariant - assert(t->length_of_array >= t->smallest_never_used_blocknum.b); + paranoid_invariant(t->length_of_array >= t->smallest_never_used_blocknum.b); + return b.b >= RESERVED_BLOCKNUMS && b.b < t->smallest_never_used_blocknum.b; } //Can be freed static inline void -verify_valid_freeable_blocknum (struct translation *t, BLOCKNUM b) { - assert(b.b >= RESERVED_BLOCKNUMS); - assert(b.b < t->smallest_never_used_blocknum.b); - - //Sanity check: Verify invariant - assert(t->length_of_array >= t->smallest_never_used_blocknum.b); +verify_valid_freeable_blocknum (struct translation *UU(t), BLOCKNUM UU(b)) { + paranoid_invariant(is_valid_freeable_blocknum(t, b)); } static void @@ -376,11 +382,9 @@ calculate_size_on_disk (struct translation *t) { // We cannot free the disk space allocated to this blocknum if it is still in use by the given translation table. static inline bool translation_prevents_freeing(struct translation *t, BLOCKNUM b, struct block_translation_pair *old_pair) { - bool r = (bool) - (t->block_translation && + return (t->block_translation && b.b < t->smallest_never_used_blocknum.b && old_pair->u.diskoff == t->block_translation[b.b].u.diskoff); - return r; } static void @@ -413,7 +417,7 @@ PRNTF("Freed", b.b, old_pair.size, old_pair.u.diskoff, bt); PRNTF("New", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt); //Update inprogress btt if appropriate (if called because Pending bit is set). if (for_checkpoint) { - assert(b.b < bt->inprogress.length_of_array); + paranoid_invariant(b.b < bt->inprogress.length_of_array); bt->inprogress.block_translation[b.b] = t->block_translation[b.b]; } } @@ -449,17 +453,22 @@ toku_blocknum_realloc_on_disk (BLOCK_TABLE bt, BLOCKNUM b, DISKOFF size, DISKOFF unlock_for_blocktable(bt); } +__attribute__((nonnull,const)) +static inline bool +pair_is_unallocated(struct block_translation_pair *pair) { + return pair->size == 0 && pair->u.diskoff == diskoff_unused; +} + // Purpose of this function is to figure out where to put the inprogress btt on disk, allocate space for it there. static void blocknum_alloc_translation_on_disk_unlocked (BLOCK_TABLE bt) { toku_mutex_assert_locked(&bt->mutex); struct translation *t = &bt->inprogress; - assert(t->block_translation); + paranoid_invariant_notnull(t->block_translation); BLOCKNUM b = make_blocknum(RESERVED_BLOCKNUM_TRANSLATION); - struct block_translation_pair old_pair = t->block_translation[b.b]; //Each inprogress is allocated only once - assert(old_pair.size == 0 && old_pair.u.diskoff == diskoff_unused); + paranoid_invariant(pair_is_unallocated(&t->block_translation[b.b])); //Allocate a new block int64_t size = calculate_size_on_disk(t); @@ -560,7 +569,7 @@ toku_allocate_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *res, FT ft) { t->blocknum_freelist_head = next; } //Verify the blocknum is free - assert(t->block_translation[result.b].size == size_is_free); + paranoid_invariant(t->block_translation[result.b].size == size_is_free); //blocknum is not free anymore t->block_translation[result.b].u.diskoff = diskoff_unused; t->block_translation[result.b].size = 0; @@ -580,9 +589,8 @@ static void free_blocknum_in_translation(struct translation *t, BLOCKNUM b) { verify_valid_freeable_blocknum(t, b); - struct block_translation_pair old_pair = t->block_translation[b.b]; - assert(old_pair.size != size_is_free); - + paranoid_invariant(t->block_translation[b.b].size != size_is_free); + PRNTF("free_blocknum", b.b, t->block_translation[b.b].size, t->block_translation[b.b].u.diskoff, bt); t->block_translation[b.b].size = size_is_free; t->block_translation[b.b].u.next_free_blocknum = t->blocknum_freelist_head; @@ -601,8 +609,8 @@ free_blocknum_unlocked(BLOCK_TABLE bt, BLOCKNUM *bp, FT ft, bool for_checkpoint) free_blocknum_in_translation(&bt->current, b); if (for_checkpoint) { - assert(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS); - free_blocknum_in_translation(&bt->inprogress, b); + paranoid_invariant(ft->checkpoint_header->type == FT_CHECKPOINT_INPROGRESS); + free_blocknum_in_translation(&bt->inprogress, b); } //If the size is 0, no disk block has ever been assigned to this blocknum. @@ -616,7 +624,10 @@ PRNTF("free_blocknum_free", b.b, old_pair.size, old_pair.u.diskoff, bt); block_allocator_free_block(bt->block_allocator, old_pair.u.diskoff); } } - else assert(old_pair.size==0 && old_pair.u.diskoff == diskoff_unused); + else { + paranoid_invariant(old_pair.size==0); + paranoid_invariant(old_pair.u.diskoff == diskoff_unused); + } ft_set_dirty(ft, for_checkpoint); } @@ -626,11 +637,11 @@ toku_free_blocknum(BLOCK_TABLE bt, BLOCKNUM *bp, FT ft, bool for_checkpoint) { free_blocknum_unlocked(bt, bp, ft, for_checkpoint); unlock_for_blocktable(bt); } - + //Verify there are no free blocks. void -toku_block_verify_no_free_blocknums(BLOCK_TABLE bt) { - assert(bt->current.blocknum_freelist_head.b == freelist_null.b); +toku_block_verify_no_free_blocknums(BLOCK_TABLE UU(bt)) { + paranoid_invariant(bt->current.blocknum_freelist_head.b == freelist_null.b); } // Frees blocknums that have a size of 0 and unused diskoff @@ -652,31 +663,54 @@ toku_free_unused_blocknums(BLOCK_TABLE bt, BLOCKNUM root) { unlock_for_blocktable(bt); } - -//Verify there are no data blocks except root. -void -toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root) { +__attribute__((nonnull,const,unused)) +static inline bool +no_data_blocks_except_root(BLOCK_TABLE bt, BLOCKNUM root) { + bool ok = true; lock_for_blocktable(bt); - assert(root.b >= RESERVED_BLOCKNUMS); int64_t smallest = bt->current.smallest_never_used_blocknum.b; - for (int64_t i=RESERVED_BLOCKNUMS; i < smallest; i++) { + if (root.b < RESERVED_BLOCKNUMS) { + ok = false; + goto cleanup; + } + int64_t i; + for (i=RESERVED_BLOCKNUMS; i < smallest; i++) { if (i == root.b) { continue; } BLOCKNUM b = make_blocknum(i); - assert(bt->current.block_translation[b.b].size == size_is_free); + if (bt->current.block_translation[b.b].size != size_is_free) { + ok = false; + goto cleanup; + } } + cleanup: unlock_for_blocktable(bt); + return ok; } -//Verify a blocknum is currently allocated. +//Verify there are no data blocks except root. +// TODO(leif): This actually takes a lock, but I don't want to fix all the callers right now. void -toku_verify_blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b) { +toku_block_verify_no_data_blocks_except_root(BLOCK_TABLE UU(bt), BLOCKNUM UU(root)) { + paranoid_invariant(no_data_blocks_except_root(bt, root)); +} + +__attribute__((nonnull,const,unused)) +static inline bool +blocknum_allocated(BLOCK_TABLE bt, BLOCKNUM b) { lock_for_blocktable(bt); struct translation *t = &bt->current; verify_valid_blocknum(t, b); - assert(t->block_translation[b.b].size != size_is_free); + bool ok = t->block_translation[b.b].size != size_is_free; unlock_for_blocktable(bt); + return ok; +} + +//Verify a blocknum is currently allocated. +void +toku_verify_blocknum_allocated(BLOCK_TABLE UU(bt), BLOCKNUM UU(b)) { + paranoid_invariant(blocknum_allocated(bt, b)); } //Only used by toku_dump_translation table (debug info) @@ -834,12 +868,12 @@ blocktable_note_translation (BLOCK_ALLOCATOR allocator, struct translation *t) { //See RESERVED_BLOCKNUMS // Previously this added blocks one at a time. Now we make an array and pass it in so it can be sorted and merged. See #3218. - struct block_allocator_blockpair *MALLOC_N(t->smallest_never_used_blocknum.b, pairs); + struct block_allocator_blockpair *XMALLOC_N(t->smallest_never_used_blocknum.b, pairs); uint64_t n_pairs = 0; for (int64_t i=0; i<t->smallest_never_used_blocknum.b; i++) { struct block_translation_pair pair = t->block_translation[i]; if (pair.size > 0) { - assert(pair.u.diskoff != diskoff_unused); + paranoid_invariant(pair.u.diskoff != diskoff_unused); int cur_pair = n_pairs++; pairs[cur_pair] = (struct block_allocator_blockpair) { .offset = (uint64_t) pair.u.diskoff, .size = (uint64_t) pair.size }; @@ -943,7 +977,7 @@ void toku_blocktable_internal_fragmentation (BLOCK_TABLE bt, int64_t *total_sizep, int64_t *used_sizep) { frag_extra info = {0,0}; int r = toku_blocktable_iterate(bt, TRANSLATION_CHECKPOINTED, frag_helper, &info, false, true); - assert(r==0); + assert_zero(r); if (total_sizep) *total_sizep = info.total_space; if (used_sizep) *used_sizep = info.used_space; diff --git a/ft/cachetable-internal.h b/ft/cachetable-internal.h index b46173b93a2..52b21bac2b7 100644 --- a/ft/cachetable-internal.h +++ b/ft/cachetable-internal.h @@ -134,6 +134,7 @@ struct ctpair { CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; CACHETABLE_CLEANER_CALLBACK cleaner_callback; CACHETABLE_CLONE_CALLBACK clone_callback; + CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK checkpoint_complete_callback; void *write_extraargs; // access to these fields are protected by disk_nb_mutex @@ -384,7 +385,7 @@ public: uint64_t reserve_memory(double fraction); void release_reserved_memory(uint64_t reserved_memory); void run_eviction_thread(); - void do_partial_eviction(PAIR p); + void do_partial_eviction(PAIR p, bool pair_mutex_held); void evict_pair(PAIR p, bool checkpoint_pending); void wait_for_cache_pressure_to_subside(); void signal_eviction_thread(); diff --git a/ft/cachetable.cc b/ft/cachetable.cc index cc174830eec..a97a275a26f 100644 --- a/ft/cachetable.cc +++ b/ft/cachetable.cc @@ -16,6 +16,7 @@ #include "cachetable-internal.h" #include <memory.h> #include <toku_race_tools.h> +#include <portability/toku_atomic.h> #include <portability/toku_pthread.h> #include <portability/toku_time.h> #include <util/rwlock.h> @@ -97,6 +98,10 @@ static inline void pair_unlock(PAIR p) { toku_mutex_unlock(p->mutex); } +bool toku_ctpair_is_write_locked(PAIR pair) { + return pair->value_rwlock.writers() == 1; +} + void toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { if (!ct_status.initialized) { @@ -706,7 +711,7 @@ static void cachetable_evicter(void* extra) { static void cachetable_partial_eviction(void* extra) { PAIR p = (PAIR)extra; CACHEFILE cf = p->cachefile; - p->ev->do_partial_eviction(p); + p->ev->do_partial_eviction(p, false); bjm_remove_background_job(cf->bjm); } @@ -750,6 +755,7 @@ void pair_init(PAIR p, p->pe_est_callback = write_callback.pe_est_callback; p->cleaner_callback = write_callback.cleaner_callback; p->clone_callback = write_callback.clone_callback; + p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback; p->write_extraargs = write_callback.write_extraargs; p->count = 0; // <CER> Is zero the correct init value? @@ -915,6 +921,9 @@ checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) { static void write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending) { + if (checkpoint_pending && p->checkpoint_complete_callback) { + p->checkpoint_complete_callback(p->value_data); + } if (p->dirty && checkpoint_pending) { if (p->clone_callback) { pair_lock(p); @@ -952,6 +961,9 @@ write_pair_for_checkpoint_thread (evictor* ev, PAIR p) // will be cheap. Also, much of the time we'll just be clearing // pending bits and that's definitely cheap. (see #5427) p->value_rwlock.write_lock(false); + if (p->checkpoint_pending && p->checkpoint_complete_callback) { + p->checkpoint_complete_callback(p->value_data); + } if (p->dirty && p->checkpoint_pending) { if (p->clone_callback) { nb_mutex_lock(&p->disk_nb_mutex, p->mutex); @@ -1726,73 +1738,100 @@ int toku_cachetable_get_and_pin_with_dep_pairs ( // For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify // the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit. // Similarly, if the checkpoint is actually pending, we don't want to block on it. -int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void**value) { +int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { CACHETABLE ct = cachefile->cachetable; int r = -1; ct->list.pair_lock_by_fullhash(fullhash); PAIR p = ct->list.find_pair(cachefile, key, fullhash); - if (p && p->value_rwlock.try_write_lock(true)) { - // we got the write lock fast, so continue - ct->list.read_pending_cheap_lock(); - // - // if pending a checkpoint, then we don't want to return - // the value to the user, because we are responsible for - // handling the checkpointing, which we do not want to do, - // because it is expensive - // - if (!p->dirty || p->checkpoint_pending) { - p->value_rwlock.write_unlock(); - r = -1; + if (p) { + const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); + bool got_lock = false; + switch (lock_type) { + case PL_READ: + if (p->value_rwlock.try_read_lock()) { + got_lock = p->dirty; + + if (!got_lock) { + p->value_rwlock.read_unlock(); + } + } + break; + case PL_WRITE_CHEAP: + case PL_WRITE_EXPENSIVE: + if (p->value_rwlock.try_write_lock(lock_is_expensive)) { + // we got the lock fast, so continue + ct->list.read_pending_cheap_lock(); + + // if pending a checkpoint, then we don't want to return + // the value to the user, because we are responsible for + // handling the checkpointing, which we do not want to do, + // because it is expensive + got_lock = p->dirty && !p->checkpoint_pending; + + ct->list.read_pending_cheap_unlock(); + if (!got_lock) { + p->value_rwlock.write_unlock(); + } + } + break; } - else { + if (got_lock) { + pair_touch(p); *value = p->value_data; r = 0; } - ct->list.read_pending_cheap_unlock(); - pair_unlock(p); - } - else { - ct->list.pair_unlock_by_fullhash(fullhash); } + ct->list.pair_unlock_by_fullhash(fullhash); return r; } //Used by flusher threads to possibly pin child on client thread if pinning is cheap //Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless). //All other conditions remain the same. -int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void**value) { +int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) { CACHETABLE ct = cachefile->cachetable; int r = -1; ct->list.pair_lock_by_fullhash(fullhash); PAIR p = ct->list.find_pair(cachefile, key, fullhash); - if (p && p->value_rwlock.try_write_lock(true)) { - // got the write lock fast, so continue - ct->list.read_pending_cheap_lock(); - // - // if pending a checkpoint, then we don't want to return - // the value to the user, because we are responsible for - // handling the checkpointing, which we do not want to do, - // because it is expensive - // - if (p->checkpoint_pending) { - if (p->dirty) { - p->value_rwlock.write_unlock(); - r = -1; + if (p) { + const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE); + bool got_lock = false; + switch (lock_type) { + case PL_READ: + if (p->value_rwlock.try_read_lock()) { + got_lock = true; + } else if (!p->value_rwlock.read_lock_is_expensive()) { + p->value_rwlock.write_lock(lock_is_expensive); + got_lock = true; } - else { - p->checkpoint_pending = false; - *value = p->value_data; - r = 0; + if (got_lock) { + pair_touch(p); + } + pair_unlock(p); + break; + case PL_WRITE_CHEAP: + case PL_WRITE_EXPENSIVE: + if (p->value_rwlock.try_write_lock(lock_is_expensive)) { + got_lock = true; + } else if (!p->value_rwlock.write_lock_is_expensive()) { + p->value_rwlock.write_lock(lock_is_expensive); + got_lock = true; } + if (got_lock) { + pair_touch(p); + } + pair_unlock(p); + if (got_lock) { + bool checkpoint_pending = get_checkpoint_pending(p, &ct->list); + write_locked_pair_for_checkpoint(ct, p, checkpoint_pending); + } + break; } - else { + if (got_lock) { *value = p->value_data; r = 0; } - ct->list.read_pending_cheap_unlock(); - pair_unlock(p); - } - else { + } else { ct->list.pair_unlock_by_fullhash(fullhash); } return r; @@ -3524,7 +3563,7 @@ void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) { // the size of the cachetable. // void evictor::add_to_size_current(long size) { - (void) __sync_fetch_and_add(&m_size_current, size); + (void) toku_sync_fetch_and_add(&m_size_current, size); } // @@ -3532,7 +3571,7 @@ void evictor::add_to_size_current(long size) { // approximation of the cachetable size. // void evictor::remove_from_size_current(long size) { - (void) __sync_fetch_and_sub(&m_size_current, size); + (void) toku_sync_fetch_and_sub(&m_size_current, size); } // @@ -3543,7 +3582,7 @@ uint64_t evictor::reserve_memory(double fraction) { toku_mutex_lock(&m_ev_thread_lock); reserved_memory = fraction * (m_low_size_watermark - m_size_reserved); m_size_reserved += reserved_memory; - (void) __sync_fetch_and_add(&m_size_current, reserved_memory); + (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory); this->signal_eviction_thread(); toku_mutex_unlock(&m_ev_thread_lock); @@ -3557,7 +3596,7 @@ uint64_t evictor::reserve_memory(double fraction) { // TODO: (Zardosht) comment this function // void evictor::release_reserved_memory(uint64_t reserved_memory){ - (void) __sync_fetch_and_sub(&m_size_current, reserved_memory); + (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory); toku_mutex_lock(&m_ev_thread_lock); m_size_reserved -= reserved_memory; // signal the eviction thread in order to possibly wake up sleeping clients @@ -3710,7 +3749,6 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { curr_in_clock->count--; // call the partial eviction callback curr_in_clock->value_rwlock.write_lock(true); - pair_unlock(curr_in_clock); void *value = curr_in_clock->value_data; void* disk_data = curr_in_clock->disk_data; @@ -3726,13 +3764,15 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { ); if (cost == PE_CHEAP) { curr_in_clock->size_evicting_estimate = 0; - this->do_partial_eviction(curr_in_clock); + this->do_partial_eviction(curr_in_clock, true); bjm_remove_background_job(cf->bjm); + pair_unlock(curr_in_clock); } else if (cost == PE_EXPENSIVE) { // only bother running an expensive partial eviction // if it is expected to free space if (bytes_freed_estimate > 0) { + pair_unlock(curr_in_clock); curr_in_clock->size_evicting_estimate = bytes_freed_estimate; toku_mutex_lock(&m_ev_thread_lock); m_size_evicting += bytes_freed_estimate; @@ -3744,7 +3784,6 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { ); } else { - pair_lock(curr_in_clock); curr_in_clock->value_rwlock.write_unlock(); pair_unlock(curr_in_clock); bjm_remove_background_job(cf->bjm); @@ -3767,10 +3806,10 @@ exit: } // -// on entry, pair's mutex is not held, but pair is pinned +// on entry and exit, pair's mutex is held if pair_mutex_held is true // on exit, PAIR is unpinned // -void evictor::do_partial_eviction(PAIR p) { +void evictor::do_partial_eviction(PAIR p, bool pair_mutex_held) { PAIR_ATTR new_attr; PAIR_ATTR old_attr = p->attr; @@ -3779,9 +3818,13 @@ void evictor::do_partial_eviction(PAIR p) { this->change_pair_attr(old_attr, new_attr); p->attr = new_attr; this->decrease_size_evicting(p->size_evicting_estimate); - pair_lock(p); + if (!pair_mutex_held) { + pair_lock(p); + } p->value_rwlock.write_unlock(); - pair_unlock(p); + if (!pair_mutex_held) { + pair_unlock(p); + } } // diff --git a/ft/cachetable.h b/ft/cachetable.h index 186916f7ab8..95e7f1e091a 100644 --- a/ft/cachetable.h +++ b/ft/cachetable.h @@ -173,12 +173,15 @@ typedef int (*CACHETABLE_CLEANER_CALLBACK)(void *ftnode_pv, BLOCKNUM blocknum, u typedef void (*CACHETABLE_CLONE_CALLBACK)(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs); +typedef void (*CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK)(void *value_data); + typedef struct { CACHETABLE_FLUSH_CALLBACK flush_callback; CACHETABLE_PARTIAL_EVICTION_EST_CALLBACK pe_est_callback; CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback; CACHETABLE_CLEANER_CALLBACK cleaner_callback; CACHETABLE_CLONE_CALLBACK clone_callback; + CACHETABLE_CHECKPOINT_COMPLETE_CALLBACK checkpoint_complete_callback; void* write_extraargs; // parameter for flush_callback, pe_est_callback, pe_callback, and cleaner_callback } CACHETABLE_WRITE_CALLBACK; @@ -366,14 +369,14 @@ int toku_cachetable_get_and_pin_nonblocking ( UNLOCKERS unlockers ); -int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, void**); +int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, pair_lock_type, void**); // Effect: Maybe get and pin a memory object. // This function is similar to the get_and_pin function except that it // will not attempt to fetch a memory object that is not in the cachetable or requires any kind of blocking to get it. // Returns: If the the item is already in memory, then return 0 and store it in the // void**. If the item is not in memory, then return a nonzero error number. -int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, void**); +int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE, CACHEKEY, uint32_t /*fullhash*/, pair_lock_type, void**); // Effect: Like maybe get and pin, but may pin a clean pair. int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size); @@ -556,5 +559,10 @@ int toku_cleaner_thread(void *cleaner_v); // The default of 1M is too high for drd tests, so this is a mechanism to set a smaller number. void toku_pair_list_set_lock_size(uint32_t num_locks); +// Used by ft-ops.cc to figure out if it has the write lock on a pair. +// Pretty hacky and not accurate enough, should be improved at the frwlock +// layer. +__attribute__((const,nonnull)) +bool toku_ctpair_is_write_locked(PAIR pair); #endif /* CACHETABLE_H */ diff --git a/ft/checkpoint.cc b/ft/checkpoint.cc index 9aaf1692572..52b4d741329 100644 --- a/ft/checkpoint.cc +++ b/ft/checkpoint.cc @@ -49,6 +49,7 @@ #include "log-internal.h" #include "logger.h" #include "checkpoint.h" +#include <portability/toku_atomic.h> /////////////////////////////////////////////////////////////////////////////////// // Engine status @@ -173,7 +174,7 @@ checkpoint_safe_checkpoint_unlock(void) { void toku_multi_operation_client_lock(void) { if (locked_mo) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_MO), 1); toku_pthread_rwlock_rdlock(&multi_operation_lock); } @@ -185,7 +186,7 @@ toku_multi_operation_client_unlock(void) { void toku_checkpoint_safe_client_lock(void) { if (locked_cs) - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_CLIENT_WAIT_ON_CS), 1); toku_pthread_rwlock_rdlock(&checkpoint_safe_lock); toku_multi_operation_client_lock(); } @@ -227,9 +228,9 @@ toku_checkpoint(CHECKPOINTER cp, TOKULOGGER logger, assert(initialized); - (void) __sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(CP_WAITERS_NOW), 1); checkpoint_safe_checkpoint_lock(); - (void) __sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1); + (void) toku_sync_fetch_and_sub(&STATUS_VALUE(CP_WAITERS_NOW), 1); if (STATUS_VALUE(CP_WAITERS_NOW) > STATUS_VALUE(CP_WAITERS_MAX)) STATUS_VALUE(CP_WAITERS_MAX) = STATUS_VALUE(CP_WAITERS_NOW); // threadsafe, within checkpoint_safe lock diff --git a/ft/fifo.cc b/ft/fifo.cc index 30b961c6908..f7b6b871967 100644 --- a/ft/fifo.cc +++ b/ft/fifo.cc @@ -25,12 +25,23 @@ static void fifo_init(struct fifo *fifo) { fifo->memory_used = 0; } +__attribute__((const,nonnull)) static int fifo_entry_size(struct fifo_entry *entry) { return sizeof (struct fifo_entry) + entry->keylen + entry->vallen + xids_get_size(&entry->xids_s) - sizeof(XIDS_S); //Prevent double counting from fifo_entry+xids_get_size } +__attribute__((const,nonnull)) +size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd) { + // This must stay in sync with fifo_entry_size because that's what we + // really trust. But sometimes we only have an in-memory FT_MSG, not + // a serialized fifo_entry so we have to fake it. + return sizeof (struct fifo_entry) + cmd->u.id.key->size + cmd->u.id.val->size + + xids_get_size(cmd->xids) + - sizeof(XIDS_S); +} + int toku_fifo_create(FIFO *ptr) { struct fifo *XMALLOC(fifo); if (fifo == 0) return ENOMEM; @@ -112,6 +123,9 @@ int toku_fifo_iterate_internal_next(FIFO fifo, int off) { struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) { return (struct fifo_entry *)(fifo->memory + off); } +size_t toku_fifo_internal_entry_memsize(struct fifo_entry *e) { + return fifo_entry_size(e); +} void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, void*), void *arg) { FIFO_ITERATE(fifo, diff --git a/ft/fifo.h b/ft/fifo.h index 9af8191bc3f..75eb14a737c 100644 --- a/ft/fifo.h +++ b/ft/fifo.h @@ -68,26 +68,30 @@ unsigned long toku_fifo_memory_footprint(FIFO fifo); // return how much memory void toku_fifo_iterate(FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, void*), void*); #define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,msnvar,xidsvar,is_freshvar,body) ({ \ - for (int fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \ - toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \ - fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \ - struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \ - ITEMLEN keylenvar = e->keylen; \ - ITEMLEN datalenvar = e->vallen; \ - enum ft_msg_type typevar = fifo_entry_get_msg_type(e); \ - MSN msnvar = e->msn; \ - XIDS xidsvar = &e->xids_s; \ - bytevec keyvar = xids_get_end_of_array(xidsvar); \ - bytevec datavar = (const uint8_t*)keyvar + e->keylen; \ - bool is_freshvar = e->is_fresh; \ - body; \ + for (int fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \ + toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \ + fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \ + struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \ + ITEMLEN keylenvar = e->keylen; \ + ITEMLEN datalenvar = e->vallen; \ + enum ft_msg_type typevar = fifo_entry_get_msg_type(e); \ + MSN msnvar = e->msn; \ + XIDS xidsvar = &e->xids_s; \ + bytevec keyvar = xids_get_end_of_array(xidsvar); \ + bytevec datavar = (const uint8_t*)keyvar + e->keylen; \ + bool is_freshvar = e->is_fresh; \ + body; \ } }) +#define FIFO_CURRENT_ENTRY_MEMSIZE toku_fifo_internal_entry_memsize(e) + // Internal functions for the iterator. int toku_fifo_iterate_internal_start(FIFO fifo); int toku_fifo_iterate_internal_has_more(FIFO fifo, int off); int toku_fifo_iterate_internal_next(FIFO fifo, int off); struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off); +size_t toku_fifo_internal_entry_memsize(struct fifo_entry *e) __attribute__((const,nonnull)); +size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd) __attribute__((const,nonnull)); DBT *fill_dbt_for_fifo_entry(DBT *dbt, const struct fifo_entry *entry); struct fifo_entry *toku_fifo_get_entry(FIFO fifo, int off); diff --git a/ft/ft-cachetable-wrappers.cc b/ft/ft-cachetable-wrappers.cc index 9ab4fb3ec91..a0789ea50b5 100644 --- a/ft/ft-cachetable-wrappers.cc +++ b/ft/ft-cachetable-wrappers.cc @@ -147,7 +147,7 @@ try_again_for_write_lock: if (apply_ancestor_messages && node->height == 0) { needs_ancestors_messages = toku_ft_leaf_needs_ancestors_messages(brt->ft, node, ancestors, bounds, &max_msn_in_path); if (needs_ancestors_messages && needed_lock_type == PL_READ) { - toku_unpin_ftnode_read_only(brt, node); + toku_unpin_ftnode_read_only(brt->ft, node); needed_lock_type = PL_WRITE_CHEAP; goto try_again_for_write_lock; } @@ -296,14 +296,14 @@ toku_pin_ftnode_off_client_thread_batched( h, blocknum, fullhash, bfe, lock_type, num_dependent_nodes, dependent_nodes, node_p, true); } -int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep) { +int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, pair_lock_type lock_type, FTNODE *nodep) { void *node_v; - int r = toku_cachetable_maybe_get_and_pin_clean(ft->cf, blocknum, fullhash, &node_v); + int r = toku_cachetable_maybe_get_and_pin_clean(ft->cf, blocknum, fullhash, lock_type, &node_v); if (r != 0) { goto cleanup; } CAST_FROM_VOIDP(*nodep, node_v); - if ((*nodep)->height > 0) { + if ((*nodep)->height > 0 && lock_type != PL_READ) { toku_move_ftnode_messages_to_stale(ft, *nodep); } cleanup: @@ -331,14 +331,13 @@ toku_unpin_ftnode(FT ft, FTNODE node) } void -toku_unpin_ftnode_read_only(FT_HANDLE brt, FTNODE node) +toku_unpin_ftnode_read_only(FT ft, FTNODE node) { int r = toku_cachetable_unpin( - brt->ft->cf, + ft->cf, node->ct_pair, (enum cachetable_dirty) node->dirty, make_invalid_pair_attr() ); assert(r==0); } - diff --git a/ft/ft-cachetable-wrappers.h b/ft/ft-cachetable-wrappers.h index bcfbe86a6f8..b9a5d3f6451 100644 --- a/ft/ft-cachetable-wrappers.h +++ b/ft/ft-cachetable-wrappers.h @@ -108,7 +108,7 @@ toku_pin_ftnode_off_client_thread_and_maybe_move_messages( * This function may return a pinned ftnode to the caller, if pinning is cheap. * If the node is already locked, or is pending a checkpoint, the node is not pinned and -1 is returned. */ -int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, FTNODE *nodep); +int toku_maybe_pin_ftnode_clean(FT ft, BLOCKNUM blocknum, uint32_t fullhash, pair_lock_type lock_type, FTNODE *nodep); /** * Batched version of toku_pin_ftnode_off_client_thread, see cachetable @@ -158,6 +158,6 @@ void toku_unpin_ftnode(FT h, FTNODE node); void -toku_unpin_ftnode_read_only(FT_HANDLE brt, FTNODE node); +toku_unpin_ftnode_read_only(FT ft, FTNODE node); #endif diff --git a/ft/ft-flusher.cc b/ft/ft-flusher.cc index 0cadf62699f..81801cdaec3 100644 --- a/ft/ft-flusher.cc +++ b/ft/ft-flusher.cc @@ -9,6 +9,8 @@ #include <ft-flusher-internal.h> #include <ft-cachetable-wrappers.h> #include <ft.h> +#include <toku_assert.h> +#include <portability/toku_atomic.h> /* Status is intended for display to humans to help understand system behavior. * It does not need to be perfectly thread-safe. @@ -98,11 +100,13 @@ find_heaviest_child(FTNODE node) int i; if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight); - assert(node->n_children>0); + paranoid_invariant(node->n_children>0); for (i=1; i<node->n_children; i++) { +#ifdef TOKU_DEBUG_PARANOID if (BP_WORKDONE(node,i)) { assert(toku_bnc_nbytesinbuf(BNC(node,i)) > 0); } +#endif int this_weight = toku_bnc_nbytesinbuf(BNC(node,i)) + BP_WORKDONE(node,i);; if (0) printf(" %d", this_weight); if (max_weight < this_weight) { @@ -180,7 +184,7 @@ pick_heaviest_child(FT UU(h), void* UU(extra)) { int childnum = find_heaviest_child(parent); - assert(toku_bnc_n_entries(BNC(parent, childnum))>0); + paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0); return childnum; } @@ -348,7 +352,7 @@ ctm_maybe_merge_child(struct flusher_advice *fa, void *extra) { if (child->height == 0) { - (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1); } default_merge_child(fa, h, parent, childnum, child, extra); } @@ -366,7 +370,7 @@ ct_maybe_merge_child(struct flusher_advice *fa, } else { struct ctm_extra ctme; - assert(parent->n_children > 1); + paranoid_invariant(parent->n_children > 1); int pivot_to_save; // // we have two cases, one where the childnum @@ -413,12 +417,12 @@ ct_maybe_merge_child(struct flusher_advice *fa, toku_assert_entire_node_in_memory(root_node); } - (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1); - (void) __sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1); flush_some_child(h, root_node, &new_fa); - (void) __sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1); + (void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1); toku_free(ctme.target_key.data); } @@ -483,13 +487,14 @@ handle_split_of_child( DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */ ) { - assert(node->height>0); - assert(0 <= childnum && childnum < node->n_children); + paranoid_invariant(node->height>0); + paranoid_invariant(0 <= childnum); + paranoid_invariant(childnum < node->n_children); toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(childa); toku_assert_entire_node_in_memory(childb); - int old_count = toku_bnc_nbytesinbuf(BNC(node, childnum)); - assert(old_count==0); + NONLEAF_CHILDINFO old_bnc = BNC(node, childnum); + paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0); int cnum; WHEN_NOT_GCOV( if (toku_ft_debug_mode) { @@ -515,13 +520,20 @@ handle_split_of_child( memset(&node->bp[childnum+1],0,sizeof(node->bp[0])); node->n_children++; - assert(BP_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child + paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child BP_BLOCKNUM(node, childnum+1) = childb->thisnodename; BP_WORKDONE(node, childnum+1) = 0; BP_STATE(node,childnum+1) = PT_AVAIL; - set_BNC(node, childnum+1, toku_create_empty_nl()); + NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl(); + for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) { + // just split the flows in half for now, can't guess much better + // at the moment + new_bnc->flow[i] = old_bnc->flow[i] / 2; + old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2; + } + set_BNC(node, childnum+1, new_bnc); // Slide the keys over { @@ -553,7 +565,7 @@ handle_split_of_child( } static int -verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv) +UU() verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv) { LEAFENTRY CAST_FROM_VOIDP(le, lev); struct mempool *CAST_FROM_VOIDP(mp, mpv); @@ -563,8 +575,9 @@ verify_in_mempool(OMTVALUE lev, uint32_t UU(idx), void *mpv) } static void -verify_all_in_mempool(FTNODE node) +verify_all_in_mempool(FTNODE UU() node) { +#ifdef TOKU_DEBUG_PARANOID if (node->height==0) { for (int i = 0; i < node->n_children; i++) { invariant(BP_STATE(node,i) == PT_AVAIL); @@ -572,13 +585,14 @@ verify_all_in_mempool(FTNODE node) toku_omt_iterate(bn->buffer, verify_in_mempool, &bn->buffer_mempool); } } +#endif } static uint64_t ftleaf_disk_size(FTNODE node) // Effect: get the disk size of a leafentry { - assert(node->height == 0); + paranoid_invariant(node->height == 0); toku_assert_entire_node_in_memory(node); uint64_t retval = 0; for (int i = 0; i < node->n_children; i++) { @@ -587,8 +601,8 @@ ftleaf_disk_size(FTNODE node) for (uint32_t j=0; j < n_leafentries; j++) { OMTVALUE v; int r = toku_omt_fetch(curr_buffer, j, &v); - LEAFENTRY CAST_FROM_VOIDP(curr_le, v); assert_zero(r); + LEAFENTRY CAST_FROM_VOIDP(curr_le, v); retval += leafentry_disksize(curr_le); } } @@ -598,47 +612,69 @@ ftleaf_disk_size(FTNODE node) static void ftleaf_get_split_loc( FTNODE node, - uint64_t sumlesizes, - int* bn_index, // which basement within leaf - int* le_index // which key within basement + enum split_mode split_mode, + int *num_left_bns, // which basement within leaf + int *num_left_les // which key within basement ) // Effect: Find the location within a leaf node where we want to perform a split -// bn_index is which basement node (which OMT) should be split. -// le_index is index into OMT of the last key that should be on the left side of the split. +// num_left_bns is how many basement nodes (which OMT) should be split to the left. +// num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split. { - assert(node->height == 0); - uint32_t size_so_far = 0; - for (int i = 0; i < node->n_children; i++) { - OMT curr_buffer = BLB_BUFFER(node, i); - uint32_t n_leafentries = toku_omt_size(curr_buffer); - for (uint32_t j=0; j < n_leafentries; j++) { - OMTVALUE lev; - int r = toku_omt_fetch(curr_buffer, j, &lev); - assert_zero(r); - LEAFENTRY CAST_FROM_VOIDP(curr_le, lev); - size_so_far += leafentry_disksize(curr_le); - if (size_so_far >= sumlesizes/2) { - *bn_index = i; - *le_index = j; - if ((*bn_index == node->n_children - 1) && - ((unsigned int) *le_index == n_leafentries - 1)) { - // need to correct for when we're splitting after the - // last element, that makes no sense - if (*le_index > 0) { - (*le_index)--; - } else if (*bn_index > 0) { - (*bn_index)--; - *le_index = toku_omt_size(BLB_BUFFER(node, *bn_index)) - 1; - } else { - // we are trying to split a leaf with only one - // leafentry in it - abort(); + switch (split_mode) { + case SPLIT_LEFT_HEAVY: { + *num_left_bns = node->n_children; + *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1)); + if (*num_left_les == 0) { + *num_left_bns = node->n_children - 1; + *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1)); + invariant(*num_left_les > 0); + } + goto exit; + } + case SPLIT_RIGHT_HEAVY: { + *num_left_bns = 1; + *num_left_les = 1; + goto exit; + } + case SPLIT_EVENLY: { + paranoid_invariant(node->height == 0); + // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice + uint64_t sumlesizes = ftleaf_disk_size(node); + uint32_t size_so_far = 0; + for (int i = 0; i < node->n_children; i++) { + OMT curr_buffer = BLB_BUFFER(node, i); + uint32_t n_leafentries = toku_omt_size(curr_buffer); + for (uint32_t j=0; j < n_leafentries; j++) { + OMTVALUE lev; + int r = toku_omt_fetch(curr_buffer, j, &lev); + assert_zero(r); + LEAFENTRY CAST_FROM_VOIDP(curr_le, lev); + size_so_far += leafentry_disksize(curr_le); + if (size_so_far >= sumlesizes/2) { + *num_left_bns = i + 1; + *num_left_les = j + 1; + if (*num_left_bns == node->n_children && + (unsigned int) *num_left_les == n_leafentries) { + // need to correct for when we're splitting after the + // last element, that makes no sense + if (*num_left_les > 1) { + (*num_left_les)--; + } else if (*num_left_bns > 1) { + (*num_left_bns)--; + *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1)); + } else { + // we are trying to split a leaf with only one + // leafentry in it + abort(); + } } + goto exit; } - goto exit; } } } + } + abort(); exit: return; } @@ -655,7 +691,7 @@ move_leafentries( ) //Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt { - assert(lbi < ube); + paranoid_invariant(lbi < ube); OMTVALUE *XMALLOC_N(ube-lbi, newleafpointers); // create new omt size_t mpsize = toku_mempool_get_used_space(&src_bn->buffer_mempool); // overkill, but safe @@ -692,6 +728,7 @@ ftleaf_split( FTNODE *nodeb, DBT *splitk, bool create_new_node, + enum split_mode split_mode, uint32_t num_dependent_nodes, FTNODE* dependent_nodes) // Effect: Split a leaf node. @@ -702,7 +739,7 @@ ftleaf_split( // splitk is the right-most key of nodea { - invariant(node->height == 0); + paranoid_invariant(node->height == 0); STATUS_VALUE(FT_FLUSHER_SPLIT_LEAF)++; if (node->n_children) { // First move all the accumulated stat64info deltas into the first basement. @@ -744,31 +781,20 @@ ftleaf_split( } - assert(node->height==0); + paranoid_invariant(node->height==0); toku_assert_entire_node_in_memory(node); verify_all_in_mempool(node); MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk; - // variables that say where we will do the split. We do it in the basement node indexed at - // at last_bn_on_left and at the index last_le_on_left_within_bn within that basement node. - int last_bn_on_left = 0; // last_bn_on_left may or may not be fully included - int last_le_on_left_within_bn = 0; + // variables that say where we will do the split. + // After the split, there will be num_left_bns basement nodes in the left node, + // and the last basement node in the left node will have num_left_les leafentries. + int num_left_bns; + int num_left_les; + ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les); { - { - // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice - uint64_t sumlesizes=0; - sumlesizes = ftleaf_disk_size(node); - // TODO: (Zardosht) #3537, figure out serial insertion optimization again later - // split in half - ftleaf_get_split_loc( - node, - sumlesizes, - &last_bn_on_left, - &last_le_on_left_within_bn - ); - } // did we split right on the boundary between basement nodes? - const bool split_on_boundary = (last_le_on_left_within_bn == (int) toku_omt_size(BLB_BUFFER(node, last_bn_on_left)) - 1); + const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) toku_omt_size(BLB_BUFFER(node, num_left_bns - 1))); // Now we know where we are going to break it // the two nodes will have a total of n_children+1 basement nodes // and n_children-1 pivots @@ -781,8 +807,17 @@ ftleaf_split( // (if split_on_boundary is false) will be affected. All other mempools will remain intact. ??? //set up the basement nodes in the new node - int num_children_in_node = last_bn_on_left + 1; - int num_children_in_b = node->n_children - last_bn_on_left - (split_on_boundary ? 1 : 0); + int num_children_in_node = num_left_bns; + // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because + // while it's not on the boundary, we do need node->n_children + // children in B. + int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0); + if (num_children_in_b == 0) { + // for uneven split, make sure we have at least 1 bn + paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY); + num_children_in_b = 1; + } + paranoid_invariant(num_children_in_node > 0); if (create_new_node) { toku_initialize_empty_ftnode( B, @@ -808,19 +843,19 @@ ftleaf_split( // now move all the data - int curr_src_bn_index = last_bn_on_left; + int curr_src_bn_index = num_left_bns - 1; int curr_dest_bn_index = 0; // handle the move of a subset of data in last_bn_on_left from node to B if (!split_on_boundary) { BP_STATE(B,curr_dest_bn_index) = PT_AVAIL; uint32_t diff_size = 0; - destroy_basement_node (BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array + destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array set_BNULL(B, curr_dest_bn_index); set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer()); move_leafentries(BLB(B, curr_dest_bn_index), BLB(node, curr_src_bn_index), - last_le_on_left_within_bn+1, // first row to be moved to B + num_left_les, // first row to be moved to B toku_omt_size(BLB_BUFFER(node, curr_src_bn_index)), // number of rows in basement to be split &diff_size); BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index); @@ -830,15 +865,20 @@ ftleaf_split( } curr_src_bn_index++; - invariant(B->n_children >= curr_dest_bn_index); - invariant(node->n_children >= curr_src_bn_index); - invariant(B->n_children - curr_dest_bn_index == node->n_children - curr_src_bn_index); + paranoid_invariant(B->n_children >= curr_dest_bn_index); + paranoid_invariant(node->n_children >= curr_src_bn_index); + // move the rest of the basement nodes for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) { destroy_basement_node(BLB(B, curr_dest_bn_index)); set_BNULL(B, curr_dest_bn_index); B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index]; } + if (curr_dest_bn_index < B->n_children) { + // B already has an empty basement node here. + BP_STATE(B, curr_dest_bn_index) = PT_AVAIL; + } + node->n_children = num_children_in_node; // @@ -847,7 +887,7 @@ ftleaf_split( // the child index in the original node that corresponds to the // first node in the right node of the split - int base_index = (split_on_boundary ? last_bn_on_left + 1 : last_bn_on_left); + int base_index = num_left_bns - (split_on_boundary ? 0 : 1); // make pivots in B for (int i=0; i < num_children_in_b-1; i++) { toku_copyref_dbt(&B->childkeys[i], node->childkeys[i+base_index]); @@ -855,10 +895,10 @@ ftleaf_split( node->totalchildkeylens -= node->childkeys[i+base_index].size; toku_init_dbt(&node->childkeys[i+base_index]); } - if (split_on_boundary) { + if (split_on_boundary && split_mode != SPLIT_LEFT_HEAVY) { // destroy the extra childkey between the nodes, we'll // recreate it in splitk below - toku_free(node->childkeys[last_bn_on_left].data); + toku_free(node->childkeys[num_left_bns - 1].data); } REALLOC_N(num_children_in_node, node->bp); REALLOC_N(num_children_in_node-1, node->childkeys); @@ -867,7 +907,7 @@ ftleaf_split( if (splitk) { memset(splitk, 0, sizeof *splitk); OMTVALUE lev; - OMT buffer = BLB_BUFFER(node, last_bn_on_left); + OMT buffer = BLB_BUFFER(node, num_left_bns - 1); int r = toku_omt_fetch(buffer, toku_omt_size(buffer) - 1, &lev); assert_zero(r); // that fetch should have worked. LEAFENTRY CAST_FROM_VOIDP(le, lev); @@ -908,8 +948,8 @@ ft_nonleaf_split( int n_children_in_b = old_n_children-n_children_in_a; MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk; FTNODE B; - assert(node->height>0); - assert(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */ + paranoid_invariant(node->height>0); + paranoid_invariant(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */ create_new_ftnode_with_dep_nodes(h, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes); { /* The first n_children_in_a go into node a. @@ -932,7 +972,7 @@ ft_nonleaf_split( // Delete a child, removing the preceeding pivot key. The child number must be > 0 { - assert(i>0); + paranoid_invariant(i>0); if (i>n_children_in_a) { toku_copyref_dbt(&B->childkeys[targchild-1], node->childkeys[i-1]); B->totalchildkeylens += node->childkeys[i-1].size; @@ -978,10 +1018,11 @@ ft_split_child( FTNODE node, int childnum, FTNODE child, + enum split_mode split_mode, struct flusher_advice *fa) { - assert(node->height>0); - assert(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty + paranoid_invariant(node->height>0); + paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty FTNODE nodea, nodeb; DBT splitk; @@ -992,7 +1033,7 @@ ft_split_child( dep_nodes[0] = node; dep_nodes[1] = child; if (child->height==0) { - ftleaf_split(h, child, &nodea, &nodeb, &splitk, true, 2, dep_nodes); + ftleaf_split(h, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes); } else { ft_nonleaf_split(h, child, &nodea, &nodeb, &splitk, 2, dep_nodes); } @@ -1040,8 +1081,8 @@ flush_this_child( } bring_node_fully_into_memory(child, h); toku_assert_entire_node_in_memory(child); - assert(node->height>0); - assert(child->thisnodename.b!=0); + paranoid_invariant(node->height>0); + paranoid_invariant(child->thisnodename.b!=0); // VERIFY_NODE does not work off client thread as of now //VERIFY_NODE(t, child); node->dirty = 1; @@ -1062,10 +1103,10 @@ merge_leaf_nodes(FTNODE a, FTNODE b) STATUS_VALUE(FT_FLUSHER_MERGE_LEAF)++; toku_assert_entire_node_in_memory(a); toku_assert_entire_node_in_memory(b); - assert(a->height == 0); - assert(b->height == 0); - assert(a->n_children > 0); - assert(b->n_children > 0); + paranoid_invariant(a->height == 0); + paranoid_invariant(b->height == 0); + paranoid_invariant(a->n_children > 0); + paranoid_invariant(b->n_children > 0); // Mark nodes as dirty before moving basements from b to a. // This way, whatever deltas are accumulated in the basements are @@ -1148,7 +1189,7 @@ static void balance_leaf_nodes( merge_leaf_nodes(a,b); // now split them // because we are not creating a new node, we can pass in no dependent nodes - ftleaf_split(NULL, a, &a, &b, splitk, false, 0, NULL); + ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL); } static void @@ -1202,7 +1243,7 @@ maybe_merge_pinned_nonleaf_nodes( { toku_assert_entire_node_in_memory(a); toku_assert_entire_node_in_memory(b); - assert(parent_splitk->data); + paranoid_invariant(parent_splitk->data); int old_n_children = a->n_children; int new_n_children = old_n_children + b->n_children; XREALLOC_N(new_n_children, a->bp); @@ -1262,7 +1303,7 @@ maybe_merge_pinned_nodes( // splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes. { MSN msn_max; - assert(a->height == b->height); + paranoid_invariant(a->height == b->height); toku_assert_entire_node_in_memory(parent); toku_assert_entire_node_in_memory(a); toku_assert_entire_node_in_memory(b); @@ -1271,9 +1312,6 @@ maybe_merge_pinned_nodes( MSN msna = a->max_msn_applied_to_node_on_disk; MSN msnb = b->max_msn_applied_to_node_on_disk; msn_max = (msna.msn > msnb.msn) ? msna : msnb; - if (a->height > 0) { - invariant(msn_max.msn <= parent->max_msn_applied_to_node_on_disk.msn); // parent msn must be >= children's msn - } } if (a->height == 0) { maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize); @@ -1312,7 +1350,7 @@ ft_merge_child( { // this function should not be called // if the child is not mergable - assert(node->n_children > 1); + paranoid_invariant(node->n_children > 1); toku_assert_entire_node_in_memory(node); int childnuma,childnumb; @@ -1323,11 +1361,11 @@ ft_merge_child( childnuma = childnum_to_merge; childnumb = childnum_to_merge+1; } - assert(0 <= childnuma); - assert(childnuma+1 == childnumb); - assert(childnumb < node->n_children); + paranoid_invariant(0 <= childnuma); + paranoid_invariant(childnuma+1 == childnumb); + paranoid_invariant(childnumb < node->n_children); - assert(node->height>0); + paranoid_invariant(node->height>0); // We suspect that at least one of the children is fusible, but they might not be. // for test @@ -1371,22 +1409,27 @@ ft_merge_child( maybe_merge_pinned_nodes(node, &node->childkeys[childnuma], childa, childb, &did_merge, &did_rebalance, &splitk, h->h->nodesize); if (childa->height>0) { for (int i=0; i+1<childa->n_children; i++) { - assert(childa->childkeys[i].data); + paranoid_invariant(childa->childkeys[i].data); } } //toku_verify_estimates(t,childa); // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred *did_react = (bool)(did_merge || did_rebalance); if (did_merge) { - assert(!splitk.data); + paranoid_invariant(!splitk.data); } else { - assert(splitk.data); + paranoid_invariant(splitk.data); } node->totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes. if (did_merge) { - destroy_nonleaf_childinfo(BNC(node, childnumb)); + NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma); + NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb); + for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) { + remaining_bnc->flow[i] += merged_bnc->flow[i]; + } + destroy_nonleaf_childinfo(merged_bnc); set_BNULL(node, childnumb); node->n_children--; memmove(&node->bp[childnumb], @@ -1397,10 +1440,14 @@ ft_merge_child( &node->childkeys[childnuma+1], (node->n_children-childnumb)*sizeof(node->childkeys[0])); REALLOC_N(node->n_children-1, node->childkeys); - assert(BP_BLOCKNUM(node, childnuma).b == childa->thisnodename.b); + paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->thisnodename.b); childa->dirty = 1; // just to make sure childb->dirty = 1; // just to make sure } else { + // flow will be inaccurate for a while, oh well. the children + // are leaves in this case so it's not a huge deal (we're + // pretty far down the tree) + // If we didn't merge the nodes, then we need the correct pivot. toku_copyref_dbt(&node->childkeys[childnuma], splitk); node->totalchildkeylens += node->childkeys[childnuma].size; @@ -1421,13 +1468,13 @@ ft_merge_child( merge_remove_key_callback, h ); - assert(rrb==0); + assert_zero(rrb); // for test call_flusher_thread_callback(ft_flush_aflter_merge); // unlock the parent - assert(node->dirty); + paranoid_invariant(node->dirty); toku_unpin_ftnode_off_client_thread(h, node); } else { @@ -1435,7 +1482,7 @@ ft_merge_child( call_flusher_thread_callback(ft_flush_aflter_rebalance); // unlock the parent - assert(node->dirty); + paranoid_invariant(node->dirty); toku_unpin_ftnode_off_client_thread(h, node); toku_unpin_ftnode_off_client_thread(h, childb); } @@ -1463,7 +1510,7 @@ flush_some_child( { int dirtied = 0; NONLEAF_CHILDINFO bnc = NULL; - assert(parent->height>0); + paranoid_invariant(parent->height>0); toku_assert_entire_node_in_memory(parent); // pick the child we want to flush to @@ -1496,7 +1543,7 @@ flush_some_child( // the parent before finishing reading in the entire child node. bool may_child_be_reactive = may_node_be_reactive(child); - assert(child->thisnodename.b!=0); + paranoid_invariant(child->thisnodename.b!=0); //VERIFY_NODE(brt, child); // only do the following work if there is a flush to perform @@ -1508,7 +1555,9 @@ flush_some_child( // detach buffer BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents bnc = BNC(parent, childnum); - set_BNC(parent, childnum, toku_create_empty_nl()); + NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl(); + memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow); + set_BNC(parent, childnum, new_bnc); } // @@ -1592,19 +1641,19 @@ flush_some_child( // it is responsibility of `ft_split_child` to unlock nodes of // parent and child as it sees fit // - assert(parent); // just make sure we have not accidentally unpinned parent - ft_split_child(h, parent, childnum, child, fa); + paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent + ft_split_child(h, parent, childnum, child, SPLIT_EVENLY, fa); } else if (child_re == RE_FUSIBLE) { // // it is responsibility of `maybe_merge_child to unlock nodes of // parent and child as it sees fit // - assert(parent); // just make sure we have not accidentally unpinned parent + paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent fa->maybe_merge_child(fa, h, parent, childnum, child, fa->extra); } else { - assert(false); + abort(); } } @@ -1657,7 +1706,7 @@ dummy_pick_heaviest_child(FT UU(h), FTNODE UU(parent), void* UU(extra)) { - assert(false); + abort(); return -1; } @@ -1665,7 +1714,8 @@ void toku_ft_split_child( FT ft, FTNODE node, int childnum, - FTNODE child + FTNODE child, + enum split_mode split_mode ) { struct flusher_advice fa; @@ -1684,6 +1734,34 @@ void toku_ft_split_child( node, childnum, // childnum to split child, + split_mode, + &fa + ); +} + +void toku_ft_merge_child( + FT ft, + FTNODE node, + int childnum + ) +{ + struct flusher_advice fa; + flusher_advice_init( + &fa, + dummy_pick_heaviest_child, + dont_destroy_basement_nodes, + never_recursively_flush, + default_merge_child, + dummy_update_status, + default_pick_child_after_split, + NULL + ); + bool did_react; + ft_merge_child( + ft, + node, + childnum, // childnum to merge + &did_react, &fa ); } @@ -1816,13 +1894,13 @@ flush_node_on_background_thread(FT h, FTNODE parent) // and pick the child we want to flush to // int childnum = find_heaviest_child(parent); - assert(toku_bnc_n_entries(BNC(parent, childnum))>0); + paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0); // // see if we can pin the child // FTNODE child; uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum); - int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, &child); + int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child); if (r != 0) { // In this case, we could not lock the child, so just place the parent on the background thread // In the callback, we will use flush_some_child, which checks to @@ -1846,7 +1924,9 @@ flush_node_on_background_thread(FT h, FTNODE parent) parent->dirty = 1; BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents NONLEAF_CHILDINFO bnc = BNC(parent, childnum); - set_BNC(parent, childnum, toku_create_empty_nl()); + NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl(); + memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow); + set_BNC(parent, childnum, new_bnc); // // at this point, the buffer has been detached from the parent diff --git a/ft/ft-flusher.h b/ft/ft-flusher.h index 36370d65a64..7f0a71ed24a 100644 --- a/ft/ft-flusher.h +++ b/ft/ft-flusher.h @@ -89,6 +89,7 @@ ftleaf_split( FTNODE *nodeb, DBT *splitk, bool create_new_node, + enum split_mode split_mode, uint32_t num_dependent_nodes, FTNODE* dependent_nodes ); diff --git a/ft/ft-hot-flusher.cc b/ft/ft-hot-flusher.cc index a3fa072d0e0..fbc45b9eac2 100644 --- a/ft/ft-hot-flusher.cc +++ b/ft/ft-hot-flusher.cc @@ -9,6 +9,7 @@ #include <ft-cachetable-wrappers.h> #include <ft-internal.h> #include <ft.h> +#include <portability/toku_atomic.h> // Member Descirption: // 1. highest_pivot_key - this is the key that corresponds to the @@ -251,7 +252,7 @@ toku_ft_hot_optimize(FT_HANDLE brt, uint64_t loop_count = 0; MSN msn_at_start_of_hot = ZERO_MSN; // capture msn from root at // start of HOT operation - (void) __sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_STARTED), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_STARTED), 1); { toku_ft_note_hot_begin(brt); @@ -353,9 +354,9 @@ toku_ft_hot_optimize(FT_HANDLE brt, } if (success) { - (void) __sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_COMPLETED), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_COMPLETED), 1); } else { - (void) __sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_ABORTED), 1); + (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_HOT_NUM_ABORTED), 1); } } return r; diff --git a/ft/ft-internal.h b/ft/ft-internal.h index e72f2bcab51..269c1c8ba6e 100644 --- a/ft/ft-internal.h +++ b/ft/ft-internal.h @@ -124,6 +124,7 @@ struct ftnode_nonleaf_childinfo { off_omt_t broadcast_list; marked_off_omt_t fresh_message_tree; off_omt_t stale_message_tree; + uint64_t flow[2]; // current and last checkpoint }; unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc); @@ -133,6 +134,7 @@ long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc); void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp); void toku_bnc_empty(NONLEAF_CHILDINFO bnc); void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child); +bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) __attribute__((const, nonnull)); bool toku_ft_nonleaf_is_gorged(FTNODE node, uint32_t nodesize); @@ -152,13 +154,13 @@ struct ftnode_leaf_basement_node { STAT64INFO_S stat64_delta; // change in stat64 counters since basement was last written to disk }; -enum __attribute__((__packed__)) pt_state { // declare this to be packed so that when used below it will only take 1 byte. +enum pt_state { // declare this to be packed so that when used below it will only take 1 byte. PT_INVALID = 0, PT_ON_DISK = 1, PT_COMPRESSED = 2, PT_AVAIL = 3}; -enum __attribute__((__packed__)) ftnode_child_tag { +enum ftnode_child_tag { BCT_INVALID = 0, BCT_NULL, BCT_SUBBLOCK, @@ -166,7 +168,7 @@ enum __attribute__((__packed__)) ftnode_child_tag { BCT_NONLEAF }; -typedef struct __attribute__((__packed__)) ftnode_child_pointer { +typedef struct ftnode_child_pointer { union { struct sub_block *subblock; struct ftnode_nonleaf_childinfo *nonleaf; @@ -264,7 +266,13 @@ struct ftnode { // that have a read lock on an internal node may try to touch the clock // simultaneously // -#define BP_TOUCH_CLOCK(node, i) ((void) __sync_val_compare_and_swap(&(node)->bp[i].clock_count, 0, 1)) +#define BP_TOUCH_CLOCK(node, i) do { \ + TOKU_VALGRIND_HG_DISABLE_CHECKING(&(node)->bp[i].clock_count, sizeof (node)->bp[i].clock_count); \ + TOKU_DRD_IGNORE_VAR((node)->bp[i].clock_count); \ + (node)->bp[i].clock_count = 1; \ + TOKU_DRD_STOP_IGNORING_VAR((node)->bp[i].clock_count); \ + TOKU_VALGRIND_HG_ENABLE_CHECKING(&(node)->bp[i].clock_count, sizeof (node)->bp[i].clock_count); \ + } while (0) #define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0) #define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0) // not crazy about having these two here, one is for the case where we create new @@ -275,47 +283,54 @@ struct ftnode { // internal node macros static inline void set_BNULL(FTNODE node, int i) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); node->bp[i].ptr.tag = BCT_NULL; } static inline bool is_BNULL (FTNODE node, int i) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); return node->bp[i].ptr.tag == BCT_NULL; } static inline NONLEAF_CHILDINFO BNC(FTNODE node, int i) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); FTNODE_CHILD_POINTER p = node->bp[i].ptr; - assert(p.tag==BCT_NONLEAF); + paranoid_invariant(p.tag==BCT_NONLEAF); return p.u.nonleaf; } static inline void set_BNC(FTNODE node, int i, NONLEAF_CHILDINFO nl) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); FTNODE_CHILD_POINTER *p = &node->bp[i].ptr; p->tag = BCT_NONLEAF; p->u.nonleaf = nl; } static inline BASEMENTNODE BLB(FTNODE node, int i) { - assert(i<node->n_children); - assert(0<=i); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); FTNODE_CHILD_POINTER p = node->bp[i].ptr; - assert(p.tag==BCT_LEAF); + paranoid_invariant(p.tag==BCT_LEAF); return p.u.leaf; } static inline void set_BLB(FTNODE node, int i, BASEMENTNODE bn) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); FTNODE_CHILD_POINTER *p = &node->bp[i].ptr; p->tag = BCT_LEAF; p->u.leaf = bn; } static inline SUB_BLOCK BSB(FTNODE node, int i) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); FTNODE_CHILD_POINTER p = node->bp[i].ptr; - assert(p.tag==BCT_SUBBLOCK); + paranoid_invariant(p.tag==BCT_SUBBLOCK); return p.u.subblock; } static inline void set_BSB(FTNODE node, int i, SUB_BLOCK sb) { - assert(0<=i && i<node->n_children); + paranoid_invariant(0<=i); + paranoid_invariant(i<node->n_children); FTNODE_CHILD_POINTER *p = &node->bp[i].ptr; p->tag = BCT_SUBBLOCK; p->u.subblock = sb; @@ -390,6 +405,9 @@ struct ft_header { // This is decremented from our currnt MIN_MSN so as not to clash // with any existing 'normal' MSN's. MSN highest_unused_msn_for_upgrade; + // Largest MSN ever injected into the tree. Used to set the MSN for + // messages as they get injected. + MSN max_msn_in_ft; // last time that a hot optimize operation was begun uint64_t time_of_last_optimize_begin; @@ -605,13 +623,6 @@ void toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode); - -#if 1 -#define DEADBEEF ((void*)0xDEADBEEF) -#else -#define DEADBEEF ((void*)0xDEADBEEFDEADBEEF) -#endif - //#define SLOW #ifdef SLOW #define VERIFY_NODE(t,n) (toku_verify_or_set_counts(n), toku_verify_estimates(t,n)) @@ -629,6 +640,7 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode); void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h); void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe); extern void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs); +extern void toku_ftnode_checkpoint_complete_callback(void *value_data); extern void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone); extern int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs); extern void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs); @@ -643,7 +655,15 @@ void toku_ft_split_child( FT h, FTNODE node, int childnum, - FTNODE child + FTNODE child, + enum split_mode split_mode + ); +// Given pinned node, merge childnum with a neighbor and update node with +// information about the change +void toku_ft_merge_child( + FT ft, + FTNODE node, + int childnum ); static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT h) { CACHETABLE_WRITE_CALLBACK wc; @@ -652,6 +672,7 @@ static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT h) { wc.pe_callback = toku_ftnode_pe_callback; wc.cleaner_callback = toku_ftnode_cleaner_callback; wc.clone_callback = toku_ftnode_clone_callback; + wc.checkpoint_complete_callback = toku_ftnode_checkpoint_complete_callback; wc.write_extraargs = h; return wc; } @@ -720,7 +741,7 @@ static inline void fill_bfe_for_subset_read( bool disable_prefetching ) { - invariant(h->h->type == FT_CURRENT); + paranoid_invariant(h->h->type == FT_CURRENT); bfe->type = ftnode_fetch_subset; bfe->h = h; bfe->search = search; @@ -739,7 +760,7 @@ static inline void fill_bfe_for_subset_read( // Currently used for stat64. // static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) { - invariant(h->h->type == FT_CURRENT); + paranoid_invariant(h->h->type == FT_CURRENT); bfe->type = ftnode_fetch_none; bfe->h = h; bfe->search = NULL; @@ -752,7 +773,7 @@ static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) { } static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) { - assert(bfe->type == ftnode_fetch_prefetch); + paranoid_invariant(bfe->type == ftnode_fetch_prefetch); if (bfe->range_lock_left_key != NULL) { toku_free(bfe->range_lock_left_key->data); toku_destroy_dbt(bfe->range_lock_left_key); @@ -771,7 +792,7 @@ static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) { static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe, FT h, FT_CURSOR c) { - invariant(h->h->type == FT_CURRENT); + paranoid_invariant(h->h->type == FT_CURRENT); bfe->type = ftnode_fetch_prefetch; bfe->h = h; bfe->search = NULL; @@ -779,13 +800,13 @@ static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe, const DBT *left = &c->range_lock_left_key; const DBT *right = &c->range_lock_right_key; if (left->data) { - MALLOC(bfe->range_lock_left_key); resource_assert(bfe->range_lock_left_key); + XMALLOC(bfe->range_lock_left_key); toku_fill_dbt(bfe->range_lock_left_key, toku_xmemdup(left->data, left->size), left->size); } else { bfe->range_lock_left_key = NULL; } if (right->data) { - MALLOC(bfe->range_lock_right_key); resource_assert(bfe->range_lock_right_key); + XMALLOC(bfe->range_lock_right_key); toku_fill_dbt(bfe->range_lock_right_key, toku_xmemdup(right->data, right->size), right->size); } else { bfe->range_lock_right_key = NULL; @@ -815,6 +836,9 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto __attribute__((nonnull)) void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied); +__attribute__((const,nonnull)) +size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd); + int toku_ft_search_which_child( DESCRIPTOR desc, @@ -840,8 +864,8 @@ void toku_create_new_ftnode (FT_HANDLE t, FTNODE *result, int height, int n_chil void toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_children, int layout_version, unsigned int flags); -unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k, - DESCRIPTOR desc, ft_compare_func cmp) +int toku_ftnode_which_child(FTNODE node, const DBT *k, + DESCRIPTOR desc, ft_compare_func cmp) __attribute__((__warn_unused_result__)); /** @@ -854,10 +878,10 @@ unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k, * If k is equal to some pivot, then we return the next (to the right) * childnum. */ -unsigned int toku_ftnode_hot_next_child(FTNODE node, - const DBT *k, - DESCRIPTOR desc, - ft_compare_func cmp); +int toku_ftnode_hot_next_child(FTNODE node, + const DBT *k, + DESCRIPTOR desc, + ft_compare_func cmp); /* Stuff for testing */ // toku_testsetup_initialize() must be called before any other test_setup_xxx() functions are called. @@ -882,7 +906,7 @@ int toku_cmd_leafval_heaviside (OMTVALUE leafentry, void *extra) // toku_ft_root_put_cmd() accepts non-constant cmd because this is where we set the msn void toku_ft_root_put_cmd(FT h, FT_MSG_S * cmd); -void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free); +void *mempool_malloc_from_omt(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free); // Effect: Allocate a new object of size SIZE in MP. If MP runs out of space, allocate new a new mempool space, and copy all the items // from the OMT (which items refer to items in the old mempool) into the new mempool. // If MAYBE_FREE is NULL then free the old mempool's space. @@ -896,7 +920,7 @@ toku_get_node_for_verify( int toku_verify_ftnode (FT_HANDLE brt, - MSN rootmsn, MSN parentmsn, + MSN rootmsn, MSN parentmsn, bool messages_exist_above, FTNODE node, int height, const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) @@ -978,6 +1002,19 @@ typedef enum { FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, // ... because they were between lc and rc FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, FT_NUM_MSG_BUFFER_FETCHED_WRITE, + FT_PRO_NUM_ROOT_SPLIT, + FT_PRO_NUM_ROOT_H0_INJECT, + FT_PRO_NUM_ROOT_H1_INJECT, + FT_PRO_NUM_INJECT_DEPTH_0, + FT_PRO_NUM_INJECT_DEPTH_1, + FT_PRO_NUM_INJECT_DEPTH_2, + FT_PRO_NUM_INJECT_DEPTH_3, + FT_PRO_NUM_INJECT_DEPTH_GT3, + FT_PRO_NUM_STOP_NONEMPTY_BUF, + FT_PRO_NUM_STOP_H1, + FT_PRO_NUM_STOP_LOCK_CHILD, + FT_PRO_NUM_STOP_CHILD_INMEM, + FT_PRO_NUM_DIDNT_WANT_PROMOTE, FT_STATUS_NUM_ROWS } ft_status_entry; @@ -1015,6 +1052,7 @@ toku_ft_leaf_apply_cmd ( ft_update_func update_fun, DESCRIPTOR desc, FTNODE node, + int target_childnum, FT_MSG cmd, uint64_t *workdone, STAT64INFO stats_to_update @@ -1025,14 +1063,17 @@ toku_ft_node_put_cmd ( ft_compare_func compare_fun, ft_update_func update_fun, DESCRIPTOR desc, - FTNODE node, - FT_MSG cmd, + FTNODE node, + int target_childnum, + FT_MSG cmd, bool is_fresh, + size_t flow_deltas[], STAT64INFO stats_to_update ); void toku_flusher_thread_set_callback(void (*callback_f)(int, void*), void* extra); -int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h); +int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) __attribute__((nonnull)); +int toku_upgrade_msn_from_root_to_header(int fd, FT h) __attribute__((nonnull)); #endif diff --git a/ft/ft-ops.cc b/ft/ft-ops.cc index a92318e6b13..db11b8ab443 100644 --- a/ft/ft-ops.cc +++ b/ft/ft-ops.cc @@ -129,6 +129,7 @@ basement nodes, bulk fetch, and partial fetch: #include "xids.h" #include <toku_race_tools.h> +#include <portability/toku_atomic.h> #include <util/mempool.h> #include <util/partitioned_counter.h> #include <util/rwlock.h> @@ -214,6 +215,20 @@ status_init(void) STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, PARCOUNT, "buffers fetched for prefetch"); STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write"); + STATUS_INIT(FT_PRO_NUM_ROOT_SPLIT, PARCOUNT, "promotion: roots split"); + STATUS_INIT(FT_PRO_NUM_ROOT_H0_INJECT, PARCOUNT, "promotion: leaf roots injected into"); + STATUS_INIT(FT_PRO_NUM_ROOT_H1_INJECT, PARCOUNT, "promotion: h1 roots injected into"); + STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_0, PARCOUNT, "promotion: injections at depth 0"); + STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_1, PARCOUNT, "promotion: injections at depth 1"); + STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_2, PARCOUNT, "promotion: injections at depth 2"); + STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_3, PARCOUNT, "promotion: injections at depth 3"); + STATUS_INIT(FT_PRO_NUM_INJECT_DEPTH_GT3, PARCOUNT, "promotion: injections lower than depth 3"); + STATUS_INIT(FT_PRO_NUM_STOP_NONEMPTY_BUF, PARCOUNT, "promotion: stopped because of a nonempty buffer"); + STATUS_INIT(FT_PRO_NUM_STOP_H1, PARCOUNT, "promotion: stopped at height 1"); + STATUS_INIT(FT_PRO_NUM_STOP_LOCK_CHILD, PARCOUNT, "promotion: stopped because the child was locked or not at all in memory"); + STATUS_INIT(FT_PRO_NUM_STOP_CHILD_INMEM, PARCOUNT, "promotion: stopped because the child was not fully in memory"); + STATUS_INIT(FT_PRO_NUM_DIDNT_WANT_PROMOTE, PARCOUNT, "promotion: stopped anyway, after locking the child"); + ft_status.initialized = true; } static void status_destroy(void) { @@ -242,8 +257,8 @@ bool is_entire_node_in_memory(FTNODE node) { } void -toku_assert_entire_node_in_memory(FTNODE node) { - assert(is_entire_node_in_memory(node)); +toku_assert_entire_node_in_memory(FTNODE UU() node) { + paranoid_invariant(is_entire_node_in_memory(node)); } static uint32_t @@ -261,7 +276,7 @@ static enum reactivity get_leaf_reactivity (FTNODE node, uint32_t nodesize) { enum reactivity re = RE_STABLE; toku_assert_entire_node_in_memory(node); - assert(node->height==0); + paranoid_invariant(node->height==0); unsigned int size = toku_serialize_ftnode_size(node); if (size > nodesize && get_leaf_num_entries(node) > 1) { re = RE_FISSIBLE; @@ -274,7 +289,7 @@ get_leaf_reactivity (FTNODE node, uint32_t nodesize) { enum reactivity get_nonleaf_reactivity (FTNODE node) { - assert(node->height>0); + paranoid_invariant(node->height>0); int n_children = node->n_children; if (n_children > TREE_FANOUT) return RE_FISSIBLE; if (n_children*4 < TREE_FANOUT) return RE_FUSIBLE; @@ -310,7 +325,7 @@ toku_ft_nonleaf_is_gorged (FTNODE node, uint32_t nodesize) { // is greater than nodesize (which as of Maxwell should be // 4MB) // - assert(node->height > 0); + paranoid_invariant(node->height > 0); for (int child = 0; child < node->n_children; ++child) { size += BP_WORKDONE(node, child); } @@ -325,14 +340,15 @@ toku_ft_nonleaf_is_gorged (FTNODE node, uint32_t nodesize) { (!buffers_are_empty)); } -static void ft_verify_flags(FT ft, FTNODE node) { - assert(ft->h->flags == node->flags); +static void ft_verify_flags(FT UU(ft), FTNODE UU(node)) { + paranoid_invariant(ft->h->flags == node->flags); } int toku_ft_debug_mode = 0; uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) { - assert(node->height>0 && childnum<node->n_children); + paranoid_invariant(node->height>0); + paranoid_invariant(childnum<node->n_children); return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum)); } @@ -389,7 +405,7 @@ toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) static long get_avail_internal_node_partition_size(FTNODE node, int i) { - assert(node->height > 0); + paranoid_invariant(node->height > 0); return toku_bnc_memory_size(BNC(node, i)); } @@ -418,7 +434,7 @@ ftnode_cachepressure_size(FTNODE node) retval += BP_WORKDONE(node, i); } else { - assert(false); + abort(); } } } @@ -468,7 +484,7 @@ ftnode_memory_size (FTNODE node) } } else { - assert(false); + abort(); } } return retval; @@ -505,7 +521,7 @@ PAIR_ATTR make_invalid_pair_attr(void) { static uint64_t dict_id_serial = 1; static DICTIONARY_ID next_dict_id(void) { - uint64_t i = __sync_fetch_and_add(&dict_id_serial, 1); + uint64_t i = toku_sync_fetch_and_add(&dict_id_serial, 1); assert(i); // guarantee unique dictionary id by asserting 64-bit counter never wraps DICTIONARY_ID d = {.dictid = i}; return d; @@ -532,7 +548,7 @@ toku_bfe_wants_child_available (struct ftnode_fetch_extra* bfe, int childnum) int toku_bfe_leftmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node) { - lazy_assert(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch); + paranoid_invariant(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch); if (bfe->left_is_neg_infty) { return 0; } else if (bfe->range_lock_left_key == NULL) { @@ -545,7 +561,7 @@ toku_bfe_leftmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node) int toku_bfe_rightmost_child_wanted(struct ftnode_fetch_extra *bfe, FTNODE node) { - lazy_assert(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch); + paranoid_invariant(bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch); if (bfe->right_is_pos_infty) { return node->n_children - 1; } else if (bfe->range_lock_right_key == NULL) { @@ -627,7 +643,7 @@ static void ftnode_update_disk_stats( static void ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) { for (int i = 0; i < node->n_children; i++) { BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i); - assert(BP_STATE(node,i) == PT_AVAIL); + paranoid_invariant(BP_STATE(node,i) == PT_AVAIL); BP_STATE(cloned_node,i) = PT_AVAIL; BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i); if (node->height == 0) { @@ -639,6 +655,19 @@ static void ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) { } } +void toku_ftnode_checkpoint_complete_callback(void *value_data) { + FTNODE node = static_cast<FTNODE>(value_data); + if (node->height > 0) { + for (int i = 0; i < node->n_children; ++i) { + if (BP_STATE(node, i) == PT_AVAIL) { + NONLEAF_CHILDINFO bnc = BNC(node, i); + bnc->flow[1] = bnc->flow[0]; + bnc->flow[0] = 0; + } + } + } +} + void toku_ftnode_clone_callback( void* value_data, void** cloned_value_data, @@ -647,12 +676,10 @@ void toku_ftnode_clone_callback( void* write_extraargs ) { - FTNODE node = (FTNODE) value_data; + FTNODE node = static_cast<FTNODE>(value_data); toku_assert_entire_node_in_memory(node); - FT ft = (FT) write_extraargs; - FTNODE XMALLOC(cloned_node); - //FTNODE cloned_node = (FTNODE)toku_xmalloc(sizeof(*FTNODE)); - memset(cloned_node, 0, sizeof(*cloned_node)); + FT ft = static_cast<FT>(write_extraargs); + FTNODE XCALLOC(cloned_node); if (node->height == 0) { // set header stats, must be done before rebalancing ftnode_update_disk_stats(node, ft, for_checkpoint); @@ -780,7 +807,7 @@ int toku_ftnode_fetch_callback (CACHEFILE UU(cachefile), PAIR p, int fd, BLOCKNU fprintf(stderr, "Error deserializing node, errno = %d", r); } // make absolutely sure we crash before doing anything else. - assert(false); + abort(); } if (r == 0) { @@ -799,9 +826,9 @@ void toku_ftnode_pe_est_callback( void* UU(write_extraargs) ) { - assert(ftnode_pv != NULL); + paranoid_invariant(ftnode_pv != NULL); long bytes_to_free = 0; - FTNODE node = (FTNODE)ftnode_pv; + FTNODE node = static_cast<FTNODE>(ftnode_pv); if (node->dirty || node->height == 0 || node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) { *bytes_freed_estimate = 0; @@ -931,7 +958,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* continue; } else { - assert(false); + abort(); } } } @@ -985,8 +1012,8 @@ bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) { // we can possibly require is a single basement node // we find out what basement node the query cares about // and check if it is available - assert(bfe->h->compare_fun); - assert(bfe->search); + paranoid_invariant(bfe->h->compare_fun); + paranoid_invariant(bfe->search); bfe->child_to_read = toku_ft_search_which_child( &bfe->h->cmp_descriptor, bfe->h->compare_fun, @@ -1000,7 +1027,7 @@ bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) { else if (bfe->type == ftnode_fetch_prefetch) { // makes no sense to have prefetching disabled // and still call this function - assert(!bfe->disable_prefetching); + paranoid_invariant(!bfe->disable_prefetching); int lc = toku_bfe_leftmost_child_wanted(bfe, node); int rc = toku_bfe_rightmost_child_wanted(bfe, node); for (int i = lc; i <= rc; ++i) { @@ -1011,7 +1038,7 @@ bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) { } else { // we have a bug. The type should be known - assert(false); + abort(); } return retval; } @@ -1118,7 +1145,7 @@ int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraar r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe); } else { - assert(false); + abort(); } } @@ -1132,7 +1159,7 @@ int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraar "Error while reading node partition %d\n", get_maybe_error_errno()); } - assert(false); + abort(); } } @@ -1197,7 +1224,7 @@ void toku_destroy_ftnode_internals(FTNODE node) toku_free(sb->compressed_ptr); toku_free(sb); } else { - assert(is_BNULL(node, i)); + paranoid_invariant(is_BNULL(node, i)); } set_BNULL(node, i); } @@ -1233,8 +1260,8 @@ void toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_children, int layout_version, unsigned int flags) // Effect: Fill in N as an empty ftnode. { - assert(layout_version != 0); - assert(height >= 0); + paranoid_invariant(layout_version != 0); + paranoid_invariant(height >= 0); if (height == 0) STATUS_INC(FT_CREATE_LEAF, 1); @@ -1330,7 +1357,8 @@ ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp) ft, newroot, 0, // childnum to split - oldroot + oldroot, + SPLIT_EVENLY ); // ft_split_child released locks on newroot @@ -1393,7 +1421,7 @@ ft_leaf_delete_leafentry ( { int r = toku_omt_delete_at(bn->buffer, idx); - assert(r==0); + assert_zero(r); } bn->n_bytes_in_buffer -= leafentry_disksize(le); @@ -1428,11 +1456,13 @@ toku_ft_bn_apply_cmd_once ( // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is // no longer in use. We'll have to release the old mempool later. { - int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta); + int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta); invariant(r==0); } - if (new_le) assert(newsize == leafentry_disksize(new_le)); + if (new_le) { + paranoid_invariant(newsize == leafentry_disksize(new_le)); + } if (le && new_le) { bn->n_bytes_in_buffer -= oldsize; bn->n_bytes_in_buffer += newsize; @@ -1468,10 +1498,7 @@ toku_ft_bn_apply_cmd_once ( } } if (workdone) { // test programs may call with NULL - //uint64_t new_workdone = - (void) __sync_add_and_fetch(workdone, workdone_this_le); - //if (new_workdone > STATUS_VALUE(FT_MAX_WORKDONE)) - // STATUS_VALUE(FT_MAX_WORKDONE) = new_workdone; + *workdone += workdone_this_le; } // if we created a new mempool buffer, free the old one @@ -1512,8 +1539,8 @@ struct setval_extra_s { */ static void setval_fun (const DBT *new_val, void *svextra_v) { struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v); - assert(svextra->tag==setval_tag); - assert(!svextra->did_set_val); + paranoid_invariant(svextra->tag==setval_tag); + paranoid_invariant(!svextra->did_set_val); svextra->did_set_val = true; { @@ -1531,8 +1558,8 @@ static void setval_fun (const DBT *new_val, void *svextra_v) { msg.u.id.val = &val; } toku_ft_bn_apply_cmd_once(svextra->bn, &msg, - svextra->idx, svextra->le, - svextra->workdone, svextra->stats_to_update); + svextra->idx, svextra->le, + svextra->workdone, svextra->stats_to_update); svextra->setval_r = 0; } } @@ -1563,14 +1590,14 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn } else if (cmd->type == FT_UPDATE_BROADCAST_ALL) { // key is not passed in with broadcast, it comes from le // update function extra is passed in with command - assert(le); // for broadcast updates, we just hit all leafentries + paranoid_invariant(le); // for broadcast updates, we just hit all leafentries // so this cannot be null - assert(cmd->u.id.key->size == 0); + paranoid_invariant(cmd->u.id.key->size == 0); STATUS_INC(FT_UPDATES_BROADCAST, 1); keyp = toku_fill_dbt(&key, le_key(le), le_keylen(le)); update_function_extra = cmd->u.id.val; } else { - assert(false); + abort(); } if (le && !le_latest_is_del(le)) { @@ -1647,7 +1674,7 @@ toku_ft_bn_apply_cmd ( if (r==DB_NOTFOUND) { storeddata = 0; } else { - assert(r==0); + assert_zero(r); CAST_FROM_VOIDP(storeddata, storeddatav); } toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); @@ -1676,7 +1703,7 @@ toku_ft_bn_apply_cmd ( r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx); if (r == DB_NOTFOUND) break; - assert(r==0); + assert_zero(r); CAST_FROM_VOIDP(storeddata, storeddatav); while (1) { @@ -1692,11 +1719,11 @@ toku_ft_bn_apply_cmd ( //the omt than we started with and the next leafentry will be at the //same index as the deleted one. Otherwise, the next leafentry will //be at the next index (+1). - assert(num_leafentries_before == num_leafentries_after || - num_leafentries_before-1 == num_leafentries_after); + paranoid_invariant(num_leafentries_before == num_leafentries_after || + num_leafentries_before-1 == num_leafentries_after); if (num_leafentries_after==num_leafentries_before) idx++; //Not deleted, advance index. - assert(idx <= num_leafentries_after); + paranoid_invariant(idx <= num_leafentries_after); if (idx == num_leafentries_after) break; //Reached the end of the leaf r = toku_omt_fetch(bn->buffer, idx, &storeddatav); assert_zero(r); @@ -1731,7 +1758,7 @@ toku_ft_bn_apply_cmd ( toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); uint32_t new_omt_size = toku_omt_size(bn->buffer); if (new_omt_size != omt_size) { - assert(new_omt_size+1 == omt_size); + paranoid_invariant(new_omt_size+1 == omt_size); //Item was deleted. deleted = 1; } @@ -1741,7 +1768,7 @@ toku_ft_bn_apply_cmd ( else idx++; } - assert(toku_omt_size(bn->buffer) == omt_size); + paranoid_invariant(toku_omt_size(bn->buffer) == omt_size); break; case FT_COMMIT_BROADCAST_TXN: @@ -1757,7 +1784,7 @@ toku_ft_bn_apply_cmd ( toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, workdone, stats_to_update); uint32_t new_omt_size = toku_omt_size(bn->buffer); if (new_omt_size != omt_size) { - assert(new_omt_size+1 == omt_size); + paranoid_invariant(new_omt_size+1 == omt_size); //Item was deleted. deleted = 1; } @@ -1767,7 +1794,7 @@ toku_ft_bn_apply_cmd ( else idx++; } - assert(toku_omt_size(bn->buffer) == omt_size); + paranoid_invariant(toku_omt_size(bn->buffer) == omt_size); break; case FT_UPDATE: { @@ -1788,12 +1815,10 @@ toku_ft_bn_apply_cmd ( uint32_t num_leafentries_before; while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) { r = toku_omt_fetch(bn->buffer, idx, &storeddatav); - assert(r==0); + assert_zero(r); CAST_FROM_VOIDP(storeddata, storeddatav); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, workdone, stats_to_update); - // TODO(leif): This early return means get_leaf_reactivity() - // and VERIFY_NODE() never get called. Is this a problem? - assert(r==0); + assert_zero(r); if (num_leafentries_before == toku_omt_size(bn->buffer)) { // we didn't delete something, so increment the index. @@ -1880,24 +1905,30 @@ void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, // append a cmd to a nonleaf node's child buffer // should be static, but used by test programs void toku_ft_append_to_child_buffer(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) { - assert(BP_STATE(node,childnum) == PT_AVAIL); + paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL); toku_bnc_insert_msg(BNC(node, childnum), key->data, key->size, val->data, val->size, type, msn, xids, is_fresh, desc, compare_fun); node->dirty = 1; } -static void ft_nonleaf_cmd_once_to_child (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, unsigned int childnum, FT_MSG cmd, bool is_fresh) +static void ft_nonleaf_cmd_once_to_child(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int target_childnum, FT_MSG cmd, bool is_fresh, size_t flow_deltas[]) // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here. // Also we don't worry about the node getting overfull here. It's the caller's problem. { + unsigned int childnum = (target_childnum >= 0 + ? target_childnum + : toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun)); toku_ft_append_to_child_buffer(compare_fun, desc, node, childnum, cmd->type, cmd->msn, cmd->xids, is_fresh, cmd->u.id.key, cmd->u.id.val); + NONLEAF_CHILDINFO bnc = BNC(node, childnum); + bnc->flow[0] += flow_deltas[0]; + bnc->flow[1] += flow_deltas[1]; } /* Find the leftmost child that may contain the key. * If the key exists it will be in the child whose number * is the return value of this function. */ -unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k, - DESCRIPTOR desc, ft_compare_func cmp) { +int toku_ftnode_which_child(FTNODE node, const DBT *k, + DESCRIPTOR desc, ft_compare_func cmp) { #define DO_PIVOT_SEARCH_LR 0 #if DO_PIVOT_SEARCH_LR int i; @@ -1953,11 +1984,11 @@ unsigned int toku_ftnode_which_child(FTNODE node, const DBT *k, } // Used for HOT. -unsigned int +int toku_ftnode_hot_next_child(FTNODE node, - const DBT *k, - DESCRIPTOR desc, - ft_compare_func cmp) { + const DBT *k, + DESCRIPTOR desc, + ft_compare_func cmp) { int low = 0; int hi = node->n_children - 1; int mi; @@ -1990,27 +2021,14 @@ ft_msg_size(FT_MSG msg) { } -static void ft_nonleaf_cmd_once(ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh) -// Effect: Insert a message into a nonleaf. We may put it into a child, possibly causing the child to become reactive. -// We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. -// The re_array[i] gets set to reactivity of any modified child. -{ - /* find the right subtree */ - //TODO: accesses key, val directly - unsigned int childnum = toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun); - - ft_nonleaf_cmd_once_to_child (compare_fun, desc, node, childnum, cmd, is_fresh); -} - static void -ft_nonleaf_cmd_all (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh) +ft_nonleaf_cmd_all (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh, size_t flow_deltas[]) // Effect: Put the cmd into a nonleaf node. We put it into all children, possibly causing the children to become reactive. // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) { - int i; - for (i = 0; i < node->n_children; i++) { - ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, i, cmd, is_fresh); + for (int i = 0; i < node->n_children; i++) { + ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, i, cmd, is_fresh, flow_deltas); } } @@ -2033,7 +2051,7 @@ ft_msg_does_nothing(FT_MSG cmd) } static void -ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, FT_MSG cmd, bool is_fresh) +ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, int target_childnum, FT_MSG cmd, bool is_fresh, size_t flow_deltas[]) // Effect: Put the cmd into a nonleaf node. We may put it into a child, possibly causing the child to become reactive. // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do. // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.) @@ -2050,52 +2068,15 @@ ft_nonleaf_put_cmd (ft_compare_func compare_fun, DESCRIPTOR desc, FTNODE node, F invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn); node->max_msn_applied_to_node_on_disk = cmd_msn; - //TODO: Accessing type directly - switch (cmd->type) { - case FT_INSERT_NO_OVERWRITE: - case FT_INSERT: - case FT_DELETE_ANY: - case FT_ABORT_ANY: - case FT_COMMIT_ANY: - case FT_UPDATE: - ft_nonleaf_cmd_once(compare_fun, desc, node, cmd, is_fresh); - return; - case FT_COMMIT_BROADCAST_ALL: - case FT_COMMIT_BROADCAST_TXN: - case FT_ABORT_BROADCAST_TXN: - case FT_OPTIMIZE: - case FT_OPTIMIZE_FOR_UPGRADE: - case FT_UPDATE_BROADCAST_ALL: - ft_nonleaf_cmd_all (compare_fun, desc, node, cmd, is_fresh); // send message to all children - return; - case FT_NONE: - return; - } - abort(); // cannot happen -} - - -// return true if root changed, false otherwise -static void -ft_process_maybe_reactive_root (FT ft, FTNODE *nodep) { - FTNODE node = *nodep; - toku_assert_entire_node_in_memory(node); - enum reactivity re = get_node_reactivity(node, ft->h->nodesize); - switch (re) { - case RE_STABLE: - return; - case RE_FISSIBLE: - { - ft_init_new_root(ft, node, nodep); - return; - } - case RE_FUSIBLE: - return; // Cannot merge anything at the root, so return happy. + if (ft_msg_applies_once(cmd)) { + ft_nonleaf_cmd_once_to_child(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas); + } else if (ft_msg_applies_all(cmd)) { + ft_nonleaf_cmd_all(compare_fun, desc, node, cmd, is_fresh, flow_deltas); + } else { + paranoid_invariant(ft_msg_applies_none(cmd)); } - abort(); // cannot happen } - // Garbage collect one leaf entry. static void ft_basement_node_gc_once(BASEMENTNODE bn, @@ -2106,7 +2087,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn, const xid_omt_t &live_root_txns, STAT64INFO_S * delta) { - assert(leaf_entry); + paranoid_invariant(leaf_entry); // There is no point in running GC if there is only one committed // leaf entry. @@ -2132,7 +2113,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn, garbage_collect_leafentry(leaf_entry, &new_leaf_entry, &newsize, - bn->buffer, + &bn->buffer, &bn->buffer_mempool, &maybe_free, snapshot_xids, @@ -2191,7 +2172,7 @@ basement_node_gc_all_les(BASEMENTNODE bn, OMTVALUE storedatav = NULL; LEAFENTRY leaf_entry; r = toku_omt_fetch(bn->buffer, index, &storedatav); - assert(r == 0); + assert_zero(r); CAST_FROM_VOIDP(leaf_entry, storedatav); ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, delta); // Check if the leaf entry was deleted or not. @@ -2210,7 +2191,7 @@ ft_leaf_gc_all_les(FTNODE node, const xid_omt_t &live_root_txns) { toku_assert_entire_node_in_memory(node); - assert(node->height == 0); + paranoid_invariant_zero(node->height); // Loop through each leaf entry, garbage collecting as we go. for (int i = 0; i < node->n_children; ++i) { // Perform the garbage collection. @@ -2229,25 +2210,40 @@ void toku_bnc_flush_to_child( FTNODE child ) { - assert(bnc); - assert(toku_fifo_n_entries(bnc->buffer)>0); + paranoid_invariant(bnc); + paranoid_invariant(toku_fifo_n_entries(bnc->buffer)>0); STAT64INFO_S stats_delta = {0,0}; + size_t remaining_memsize = toku_fifo_buffer_size_in_use(bnc->buffer); FIFO_ITERATE( bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, ({ DBT hk,hv; FT_MSG_S ftcmd = { type, msn, xids, .u = { .id = { toku_fill_dbt(&hk, key, keylen), toku_fill_dbt(&hv, val, vallen) } } }; + size_t flow_deltas[] = { 0, 0 }; + if (remaining_memsize <= bnc->flow[0]) { + // this message is in the current checkpoint's worth of + // the end of the fifo + flow_deltas[0] = FIFO_CURRENT_ENTRY_MEMSIZE; + } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) { + // this message is in the last checkpoint's worth of the + // end of the fifo + flow_deltas[1] = FIFO_CURRENT_ENTRY_MEMSIZE; + } toku_ft_node_put_cmd( h->compare_fun, h->update_fun, &h->cmp_descriptor, child, + -1, &ftcmd, is_fresh, + flow_deltas, &stats_delta ); + remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE; })); + invariant(remaining_memsize == 0); if (stats_delta.numbytes || stats_delta.numrows) { toku_ft_update_stats(&h->in_memory_stats, stats_delta); } @@ -2277,6 +2273,12 @@ void toku_bnc_flush_to_child( } } +bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) { + static const double factor = 0.125; + const uint64_t flow_threshold = ft->h->nodesize * factor; + return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold; +} + void bring_node_fully_into_memory(FTNODE node, FT h) { if (!is_entire_node_in_memory(node)) { @@ -2299,8 +2301,10 @@ toku_ft_node_put_cmd ( ft_update_func update_fun, DESCRIPTOR desc, FTNODE node, + int target_childnum, FT_MSG cmd, bool is_fresh, + size_t flow_deltas[], STAT64INFO stats_to_update ) // Effect: Push CMD into the subtree rooted at NODE. @@ -2317,18 +2321,9 @@ toku_ft_node_put_cmd ( // and instead defer to these functions // if (node->height==0) { - uint64_t workdone = 0; - toku_ft_leaf_apply_cmd( - compare_fun, - update_fun, - desc, - node, - cmd, - &workdone, - stats_to_update - ); + toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, nullptr, stats_to_update); } else { - ft_nonleaf_put_cmd(compare_fun, desc, node, cmd, is_fresh); + ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas); } } @@ -2344,6 +2339,7 @@ void toku_ft_leaf_apply_cmd( ft_update_func update_fun, DESCRIPTOR desc, FTNODE node, + int target_childnum, // which child to inject to, or -1 if unknown FT_MSG cmd, uint64_t *workdone, STAT64INFO stats_to_update @@ -2378,16 +2374,19 @@ void toku_ft_leaf_apply_cmd( } if (ft_msg_applies_once(cmd)) { - unsigned int childnum = toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun); - if (cmd->msn.msn > BLB(node, childnum)->max_msn_applied.msn) { - BLB(node, childnum)->max_msn_applied = cmd->msn; + unsigned int childnum = (target_childnum >= 0 + ? target_childnum + : toku_ftnode_which_child(node, cmd->u.id.key, desc, compare_fun)); + BASEMENTNODE bn = BLB(node, childnum); + if (cmd->msn.msn > bn->max_msn_applied.msn) { + bn->max_msn_applied = cmd->msn; toku_ft_bn_apply_cmd(compare_fun, - update_fun, - desc, - BLB(node, childnum), - cmd, - workdone, - stats_to_update); + update_fun, + desc, + bn, + cmd, + workdone, + stats_to_update); } else { STATUS_INC(FT_MSN_DISCARDS, 1); } @@ -2409,38 +2408,44 @@ void toku_ft_leaf_apply_cmd( } } else if (!ft_msg_does_nothing(cmd)) { - assert(false); + abort(); } VERIFY_NODE(t, node); } -static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd) -// Effect: Put CMD into brt's root node, and update -// the value of root's max_msn_applied_to_node_on_disk -{ - FTNODE node = *nodep; +static void inject_message_in_locked_node(FT ft, FTNODE node, int childnum, FT_MSG_S *cmd, size_t flow_deltas[]) { + // No guarantee that we're the writer, but oh well. + // TODO(leif): Implement "do I have the lock or is it someone else?" + // check in frwlock. Should be possible with TOKU_PTHREAD_DEBUG, nop + // otherwise. + invariant(toku_ctpair_is_write_locked(node->ct_pair)); toku_assert_entire_node_in_memory(node); - MSN cmd_msn = cmd->msn; - invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn); + // Get the MSN from the header. Now that we have a write lock on the + // node we're injecting into, we know no other thread will get an MSN + // after us and get that message into our subtree before us. + cmd->msn.msn = toku_sync_add_and_fetch(&ft->h->max_msn_in_ft.msn, 1); + paranoid_invariant(cmd->msn.msn > node->max_msn_applied_to_node_on_disk.msn); STAT64INFO_S stats_delta = {0,0}; toku_ft_node_put_cmd( - h->compare_fun, - h->update_fun, - &h->cmp_descriptor, + ft->compare_fun, + ft->update_fun, + &ft->cmp_descriptor, node, + childnum, cmd, true, + flow_deltas, &stats_delta ); if (stats_delta.numbytes || stats_delta.numrows) { - toku_ft_update_stats(&h->in_memory_stats, stats_delta); + toku_ft_update_stats(&ft->in_memory_stats, stats_delta); } // // assumption is that toku_ft_node_put_cmd will // mark the node as dirty. // enforcing invariant here. // - invariant(node->dirty != 0); + paranoid_invariant(node->dirty != 0); // update some status variables if (node->height != 0) { @@ -2455,96 +2460,456 @@ static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd) STATUS_INC(FT_MSG_NUM_BROADCAST, 1); } } + + // verify that msn of latest message was captured in root node + paranoid_invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn); + + // if we call flush_some_child, then that function unpins the root + // otherwise, we unpin ourselves + if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) { + flush_node_on_background_thread(ft, node); + } + else { + toku_unpin_ftnode(ft, node); + } } -void toku_ft_root_put_cmd(FT ft, FT_MSG_S * cmd) +// seqinsert_loc is a bitmask. +// The root counts as being both on the "left extreme" and on the "right extreme". +// Therefore, at the root, you're at LEFT_EXTREME | RIGHT_EXTREME. +typedef char seqinsert_loc; +static const seqinsert_loc NEITHER_EXTREME = 0; +static const seqinsert_loc LEFT_EXTREME = 1; +static const seqinsert_loc RIGHT_EXTREME = 2; + +static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int childnum, seqinsert_loc loc) // Effect: -// - assign msn to cmd -// - push the cmd into the brt -// - cmd will set new msn in tree +// If child needs to be split or merged, do that. +// parent and child will be unlocked if this happens +// also, the batched pin will have ended if this happens +// Requires: parent and child are read locked +// Returns: +// true if relocking is needed +// false otherwise { - // blackhole fractal trees drop all messages, so do nothing. - if (ft->blackhole) { - return; + enum reactivity re = get_node_reactivity(child, ft->h->nodesize); + enum reactivity newre; + BLOCKNUM child_blocknum; + uint32_t child_fullhash; + switch (re) { + case RE_STABLE: + return false; + case RE_FISSIBLE: + { + // We only have a read lock on the parent. We need to drop both locks, and get write locks. + BLOCKNUM parent_blocknum = parent->thisnodename; + uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum); + int parent_height = parent->height; + int parent_n_children = parent->n_children; + toku_unpin_ftnode_read_only(ft, child); + toku_unpin_ftnode_read_only(ft, parent); + struct ftnode_fetch_extra bfe; + fill_bfe_for_full_read(&bfe, ft); + FTNODE newparent, newchild; + toku_pin_ftnode_off_client_thread_batched(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, 0, nullptr, &newparent); + if (newparent->height != parent_height || newparent->n_children != parent_n_children || + childnum >= newparent->n_children || toku_bnc_n_entries(BNC(newparent, childnum))) { + // If the height changed or childnum is now off the end, something clearly got split or merged out from under us. + // If something got injected in this node, then it got split or merged and we shouldn't be splitting it. + // But we already unpinned the child so we need to have the caller re-try the pins. + toku_unpin_ftnode_read_only(ft, newparent); + return true; + } + // It's ok to reuse the same childnum because if we get something + // else we need to split, well, that's crazy, but let's go ahead + // and split it. + child_blocknum = BP_BLOCKNUM(newparent, childnum); + child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum); + toku_pin_ftnode_off_client_thread_batched(ft, child_blocknum, child_fullhash, &bfe, PL_WRITE_CHEAP, 1, &newparent, &newchild); + newre = get_node_reactivity(newchild, ft->h->nodesize); + if (newre == RE_FISSIBLE) { + enum split_mode split_mode; + if (newparent->height == 1 && (loc & LEFT_EXTREME) && childnum == 0) { + split_mode = SPLIT_RIGHT_HEAVY; + } else if (newparent->height == 1 && (loc & RIGHT_EXTREME) && childnum == newparent->n_children - 1) { + split_mode = SPLIT_LEFT_HEAVY; + } else { + split_mode = SPLIT_EVENLY; + } + toku_ft_split_child(ft, newparent, childnum, newchild, split_mode); + } else { + // some other thread already got it, just unpin and tell the + // caller to retry + toku_unpin_ftnode_read_only(ft, newchild); + toku_unpin_ftnode_read_only(ft, newparent); + } + return true; + } + case RE_FUSIBLE: + { + if (parent->height == 1) { + // prevent re-merging of recently unevenly-split nodes + if (((loc & LEFT_EXTREME) && childnum <= 1) || + ((loc & RIGHT_EXTREME) && childnum >= parent->n_children - 2)) { + return false; + } + } + + int parent_height = parent->height; + BLOCKNUM parent_blocknum = parent->thisnodename; + uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum); + toku_unpin_ftnode_read_only(ft, child); + toku_unpin_ftnode_read_only(ft, parent); + struct ftnode_fetch_extra bfe; + fill_bfe_for_full_read(&bfe, ft); + FTNODE newparent, newchild; + toku_pin_ftnode_off_client_thread_batched(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, 0, nullptr, &newparent); + if (newparent->height != parent_height || childnum >= newparent->n_children) { + // looks like this is the root and it got merged, let's just start over (like in the split case above) + toku_unpin_ftnode_read_only(ft, newparent); + return true; + } + child_blocknum = BP_BLOCKNUM(newparent, childnum); + child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum); + toku_pin_ftnode_off_client_thread_batched(ft, child_blocknum, child_fullhash, &bfe, PL_READ, 1, &newparent, &newchild); + newre = get_node_reactivity(newchild, ft->h->nodesize); + if (newre == RE_FUSIBLE) { + if (newparent->n_children < 2) { + // weird case where newparent is the root node and + // can't merge, so it might only have one child. In + // this case, just return false so we can insert + // anyway. + return false; + } + toku_unpin_ftnode_read_only(ft, newchild); + toku_ft_merge_child(ft, newparent, childnum); + } else { + // some other thread already got it, just unpin and tell the + // caller to retry + toku_unpin_ftnode_read_only(ft, newchild); + toku_unpin_ftnode_read_only(ft, newparent); + } + return true; + } } + abort(); +} +static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, FT_MSG_S *cmd, size_t flow_deltas[]) +// Effect: +// Inject cmd into the node at this blocknum (cachekey). +// Gets a write lock on the node for you. +{ FTNODE node; - CACHEKEY root_key; - //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); - assert(ft); - // - // As of Dr. Noga, the following code is currently protected by two locks: - // - the ydb lock - // - header's tree lock - // - // We hold the header's tree lock to stop other threads from - // descending down the tree while the root node may change. - // The root node may change when ft_process_maybe_reactive_root is called. - // Other threads (such as the cleaner thread or hot optimize) that want to - // descend down the tree must grab the header's tree lock, so they are - // ensured that what they think is the root's blocknum is actually - // the root's blocknum. - // - // We also hold the ydb lock for a number of reasons, but an important - // one is to make sure that a begin_checkpoint may not start while - // this code is executing. A begin_checkpoint does (at least) two things - // that can interfere with the operations here: - // - copies the header to a checkpoint header. Because we may change - // the root blocknum below, we don't want the header to be copied in - // the middle of these operations. - // - Takes note of the log's LSN. Because this put operation has - // already been logged, this message injection must be included - // in any checkpoint that contains this put's logentry. - // Holding the ydb lock throughout this function ensures that fact. - // As of Dr. Noga, I (Zardosht) THINK these are the only reasons why - // the ydb lock must be held for this function, but there may be - // others - // - { - uint32_t fullhash; - toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); + struct ftnode_fetch_extra bfe; + fill_bfe_for_full_read(&bfe, ft); + toku_pin_ftnode_off_client_thread_batched(ft, cachekey, fullhash, &bfe, PL_WRITE_CHEAP, 0, NULL, &node); + toku_assert_entire_node_in_memory(node); + paranoid_invariant(node->fullhash==fullhash); + ft_verify_flags(ft, node); + inject_message_in_locked_node(ft, node, -1, cmd, flow_deltas); +} - // get the root node - struct ftnode_fetch_extra bfe; - fill_bfe_for_full_read(&bfe, ft); - toku_pin_ftnode_off_client_thread( - ft, - root_key, - fullhash, - &bfe, - PL_WRITE_EXPENSIVE, // may_modify_node - 0, - NULL, - &node - ); - toku_assert_entire_node_in_memory(node); +__attribute__((const)) +static inline bool should_inject_in_node(seqinsert_loc loc, int height, int depth) +// We should inject directly in a node if: +// - it's a leaf, or +// - it's a height 1 node not at either extreme, or +// - it's a depth 2 node not at either extreme +{ + return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2))); +} - //VERIFY_NODE(brt, node); - assert(node->fullhash==fullhash); - ft_verify_flags(ft, node); +static void push_something_in_subtree(FT ft, FTNODE subtree_root, int target_childnum, FT_MSG_S *cmd, size_t flow_deltas[], int depth, seqinsert_loc loc, bool just_did_split_or_merge) +// Effects: +// Assign cmd an MSN from ft->h. +// Put cmd in the subtree rooted at node. Due to promotion the message may not be injected directly in this node. +// Unlock node or schedule it to be unlocked (after a background flush). +// Either way, the caller is not responsible for unlocking node. +// Requires: +// subtree_root is read locked and fully in memory. +// Notes: +// In Ming, the basic rules of promotion are as follows: +// Don't promote broadcast messages. +// Don't promote past non-empty buffers. +// Otherwise, promote at most to height 1 or depth 2 (whichever is highest), as far as the birdie asks you to promote. +// We don't promote to leaves because injecting into leaves is expensive, mostly because of #5605 and some of #5552. +// We don't promote past depth 2 because we found that gives us enough parallelism without costing us too much pinning work. +// +// This is true with the following caveats: +// We always promote all the way to the leaves on the rightmost and leftmost edges of the tree, for sequential insertions. +// (That means we can promote past depth 2 near the edges of the tree.) +// +// When the birdie is still saying we should promote, we use get_and_pin so that we wait to get the node. +// If the birdie doesn't say to promote, we try maybe_get_and_pin. If we get the node cheaply, and it's dirty, we promote anyway. +{ + toku_assert_entire_node_in_memory(subtree_root); + if (should_inject_in_node(loc, subtree_root->height, depth)) { + switch (depth) { + case 0: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break; + case 1: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break; + case 2: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break; + case 3: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break; + default: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break; + } + inject_message_in_locked_node(ft, subtree_root, target_childnum, cmd, flow_deltas); + } else { + int r; + int childnum; + NONLEAF_CHILDINFO bnc; + + // toku_ft_root_put_cmd should not have called us otherwise. + paranoid_invariant(ft_msg_applies_once(cmd)); + + childnum = (target_childnum >= 0 ? target_childnum + : toku_ftnode_which_child(subtree_root, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun)); + bnc = BNC(subtree_root, childnum); + + if (toku_bnc_n_entries(bnc) > 0) { + // The buffer is non-empty, give up on promoting. + STATUS_INC(FT_PRO_NUM_STOP_NONEMPTY_BUF, 1); + goto relock_and_push_here; + } + + seqinsert_loc next_loc; + if ((loc & LEFT_EXTREME) && childnum == 0) { + next_loc = LEFT_EXTREME; + } else if ((loc & RIGHT_EXTREME) && childnum == subtree_root->n_children - 1) { + next_loc = RIGHT_EXTREME; + } else { + next_loc = NEITHER_EXTREME; + } + + if (next_loc == NEITHER_EXTREME && subtree_root->height <= 1) { + // Never promote to leaf nodes except on the edges + STATUS_INC(FT_PRO_NUM_STOP_H1, 1); + goto relock_and_push_here; + } - // first handle a reactive root, then put in the message - // ft_process_maybe_reactive_root may release and re-pin the root - // so we cannot set the msn yet. - ft_process_maybe_reactive_root(ft, &node); + { + const BLOCKNUM child_blocknum = BP_BLOCKNUM(subtree_root, childnum); + toku_verify_blocknum_allocated(ft->blocktable, child_blocknum); + const uint32_t child_fullhash = toku_cachetable_hash(ft->cf, child_blocknum); - cmd->msn.msn = node->max_msn_applied_to_node_on_disk.msn + 1; - // Note, the lower level function that filters messages based on - // msn, (toku_ft_bn_apply_cmd() or ft_nonleaf_put_cmd()) will capture - // the msn and store it in the relevant node, including the root - // node. This is how the new msn is set in the root. + FTNODE child; + { + const int child_height = subtree_root->height - 1; + const int child_depth = depth + 1; + // If we're locking a leaf, or a height 1 node or depth 2 + // node in the middle, we know we won't promote further + // than that, so just get a write lock now. + const pair_lock_type lock_type = (should_inject_in_node(next_loc, child_height, child_depth) + ? PL_WRITE_CHEAP + : PL_READ); + if (next_loc != NEITHER_EXTREME || (toku_bnc_should_promote(ft, bnc) && depth <= 1)) { + // If we're on either extreme, or the birdie wants to + // promote and we're in the top two levels of the + // tree, don't stop just because someone else has the + // node locked. + struct ftnode_fetch_extra bfe; + fill_bfe_for_full_read(&bfe, ft); + toku_pin_ftnode_off_client_thread_batched(ft, child_blocknum, child_fullhash, &bfe, lock_type, 0, nullptr, &child); + } else { + r = toku_maybe_pin_ftnode_clean(ft, child_blocknum, child_fullhash, lock_type, &child); + if (r != 0) { + // We couldn't get the child cheaply, so give up on promoting. + STATUS_INC(FT_PRO_NUM_STOP_LOCK_CHILD, 1); + goto relock_and_push_here; + } + if (is_entire_node_in_memory(child)) { + // toku_pin_ftnode... touches the clock but toku_maybe_pin_ftnode... doesn't. + // This prevents partial eviction. + for (int i = 0; i < child->n_children; ++i) { + BP_TOUCH_CLOCK(child, i); + } + } else { + // We got the child, but it's not fully in memory. Give up on promoting. + STATUS_INC(FT_PRO_NUM_STOP_CHILD_INMEM, 1); + goto unlock_child_and_push_here; + } + } + } + paranoid_invariant_notnull(child); + + if (!just_did_split_or_merge) { + BLOCKNUM subtree_root_blocknum = subtree_root->thisnodename; + uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum); + const bool did_split_or_merge = process_maybe_reactive_child(ft, subtree_root, child, childnum, loc); + if (did_split_or_merge) { + // Need to re-pin this node and try at this level again. + FTNODE newparent; + struct ftnode_fetch_extra bfe; + fill_bfe_for_full_read(&bfe, ft); // should be fully in memory, we just split it + toku_pin_ftnode_off_client_thread_batched(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, 0, nullptr, &newparent); + push_something_in_subtree(ft, newparent, -1, cmd, flow_deltas, depth, loc, true); + return; + } + } + + if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) { + push_something_in_subtree(ft, child, -1, cmd, flow_deltas, depth + 1, next_loc, false); + toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]); + // The recursive call unpinned the child, but + // we're responsible for unpinning subtree_root. + toku_unpin_ftnode_read_only(ft, subtree_root); + return; + } + + STATUS_INC(FT_PRO_NUM_DIDNT_WANT_PROMOTE, 1); + unlock_child_and_push_here: + // We locked the child, but we decided not to promote. + // Unlock the child, and fall through to the next case. + toku_unpin_ftnode_read_only(ft, child); + } + relock_and_push_here: + // Give up on promoting. + // We have subtree_root read-locked and we don't have a child locked. + // Drop the read lock, grab a write lock, and inject here. + { + // Right now we have a read lock on subtree_root, but we want + // to inject into it so we get a write lock instead. + BLOCKNUM subtree_root_blocknum = subtree_root->thisnodename; + uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum); + toku_unpin_ftnode_read_only(ft, subtree_root); + switch (depth) { + case 0: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break; + case 1: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break; + case 2: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break; + case 3: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break; + default: + STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break; + } + inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, cmd, flow_deltas); + } } - push_something_at_root(ft, &node, cmd); - // verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock) - invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn); +} - // if we call flush_some_child, then that function unpins the root - // otherwise, we unpin ourselves - if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) { - flush_node_on_background_thread(ft, node); +void toku_ft_root_put_cmd(FT ft, FT_MSG_S *cmd) +// Effect: +// - assign msn to cmd and update msn in the header +// - push the cmd into the ft + +// As of Clayface, the root blocknum is a constant, so preventing a +// race between message injection and the split of a root is the job +// of the cachetable's locking rules. +// +// We also hold the MO lock for a number of reasons, but an important +// one is to make sure that a begin_checkpoint may not start while +// this code is executing. A begin_checkpoint does (at least) two things +// that can interfere with the operations here: +// - Copies the header to a checkpoint header. Because we may change +// the max_msn_in_ft below, we don't want the header to be copied in +// the middle of these operations. +// - Takes note of the log's LSN. Because this put operation has +// already been logged, this message injection must be included +// in any checkpoint that contains this put's logentry. +// Holding the mo lock throughout this function ensures that fact. +{ + // blackhole fractal trees drop all messages, so do nothing. + if (ft->blackhole) { + return; } - else { - toku_unpin_ftnode(ft, node); // unpin root + + FTNODE node; + + uint32_t fullhash; + CACHEKEY root_key; + toku_calculate_root_offset_pointer(ft, &root_key, &fullhash); + struct ftnode_fetch_extra bfe; + fill_bfe_for_full_read(&bfe, ft); + + size_t flow_deltas[] = { toku_ft_msg_memsize_in_fifo(cmd), 0 }; + + pair_lock_type lock_type; + lock_type = PL_READ; // try first for a read lock + // If we need to split the root, we'll have to change from a read lock + // to a write lock and check again. We change the variable lock_type + // and jump back to here. + change_lock_type: + // get the root node + toku_pin_ftnode_off_client_thread_batched(ft, root_key, fullhash, &bfe, lock_type, 0, NULL, &node); + toku_assert_entire_node_in_memory(node); + paranoid_invariant(node->fullhash==fullhash); + ft_verify_flags(ft, node); + + // First handle a reactive root. + // This relocking for split algorithm will cause every message + // injection thread to change lock type back and forth, when only one + // of them needs to in order to handle the split. That's not great, + // but root splits are incredibly rare. + enum reactivity re = get_node_reactivity(node, ft->h->nodesize); + switch (re) { + case RE_STABLE: + case RE_FUSIBLE: // cannot merge anything at the root + if (lock_type != PL_READ) { + // We thought we needed to split, but someone else got to + // it before us. Downgrade to a read lock. + toku_unpin_ftnode_read_only(ft, node); + lock_type = PL_READ; + goto change_lock_type; + } + break; + case RE_FISSIBLE: + if (lock_type == PL_READ) { + // Here, we only have a read lock on the root. In order + // to split it, we need a write lock, but in the course of + // gaining the write lock, someone else may have gotten in + // before us and split it. So we upgrade to a write lock + // and check again. + toku_unpin_ftnode_read_only(ft, node); + lock_type = PL_WRITE_CHEAP; + goto change_lock_type; + } else { + // We have a write lock, now we can split. + ft_init_new_root(ft, node, &node); + // Then downgrade back to a read lock, and we can finally + // do the injection. + toku_unpin_ftnode_off_client_thread(ft, node); + lock_type = PL_READ; + STATUS_INC(FT_PRO_NUM_ROOT_SPLIT, 1); + goto change_lock_type; + } + break; + } + // If we get to here, we have a read lock and the root doesn't + // need to be split. It's safe to inject the message. + paranoid_invariant(lock_type == PL_READ); + // We cannot assert that we have the read lock because frwlock asserts + // that its mutex is locked when we check if there are any readers. + // That wouldn't give us a strong guarantee that we have the read lock + // anyway. + + // Now, either inject here or promote. We decide based on a heuristic: + if (node->height == 0 || !ft_msg_applies_once(cmd)) { + // If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here. + toku_unpin_ftnode_read_only(ft, node); + STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1); + inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas); + } else if (node->height > 1) { + // If the root's above height 1, we are definitely eligible for promotion. + push_something_in_subtree(ft, node, -1, cmd, flow_deltas, 0, LEFT_EXTREME | RIGHT_EXTREME, false); + } else { + // The root's height 1. We may be eligible for promotion here. + // On the extremes, we want to promote, in the middle, we don't. + int childnum = toku_ftnode_which_child(node, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun); + if (childnum == 0 || childnum == node->n_children - 1) { + // On the extremes, promote. We know which childnum we're going to, so pass that down too. + push_something_in_subtree(ft, node, childnum, cmd, flow_deltas, 0, LEFT_EXTREME | RIGHT_EXTREME, false); + } else { + // At height 1 in the middle, don't promote, drop the read lock and inject here. + toku_unpin_ftnode_read_only(ft, node); + STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1); + inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas); + } } } @@ -2554,7 +2919,7 @@ void toku_ft_insert (FT_HANDLE brt, DBT *key, DBT *val, TOKUTXN txn) { } void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) { - assert(txn); + paranoid_invariant(txn); toku_txn_force_fsync_on_commit(txn); //If the txn commits, the commit MUST be in the log //before the (old) file is actually unlinked TOKULOGGER logger = toku_txn_logger(txn); @@ -2573,7 +2938,7 @@ void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_in // - write to recovery log void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn) { - assert(txn); + paranoid_invariant(txn); TOKULOGGER logger = toku_txn_logger(txn); // write to the rollback log @@ -2661,7 +3026,7 @@ toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int nu } void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) { - assert(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE); + paranoid_invariant(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE); XIDS message_xids = xids_get_root_xids(); //By default use committed messages TXNID xid = toku_txn_get_txnid(txn); if (txn) { @@ -2866,7 +3231,7 @@ void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids) { struct omt_compressor_state { struct mempool *new_kvspace; - OMT omt; + OMTVALUE *newvals; }; static int move_it (OMTVALUE lev, uint32_t idx, void *v) { @@ -2874,23 +3239,27 @@ static int move_it (OMTVALUE lev, uint32_t idx, void *v) { struct omt_compressor_state *CAST_FROM_VOIDP(oc, v); uint32_t size = leafentry_memsize(le); LEAFENTRY CAST_FROM_VOIDP(newdata, toku_mempool_malloc(oc->new_kvspace, size, 1)); - lazy_assert(newdata); // we do this on a fresh mempool, so nothing bad should happen + paranoid_invariant_notnull(newdata); // we do this on a fresh mempool, so nothing bad should happen memcpy(newdata, le, size); - toku_omt_set_at(oc->omt, newdata, idx); + oc->newvals[idx] = newdata; return 0; } // Compress things, and grow the mempool if needed. -static void omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size, void **maybe_free) { +static void omt_compress_kvspace (OMT *omtp, struct mempool *memp, size_t added_size, void **maybe_free) { uint32_t total_size_needed = memp->free_offset-memp->frag_size + added_size; - if (total_size_needed+total_size_needed/4 >= memp->size) { - memp->size = total_size_needed+total_size_needed/4; + if (total_size_needed+total_size_needed >= memp->size) { + memp->size = total_size_needed+total_size_needed; } void *newmem = toku_xmalloc(memp->size); struct mempool new_kvspace; toku_mempool_init(&new_kvspace, newmem, memp->size); - struct omt_compressor_state oc = { &new_kvspace, omt }; - toku_omt_iterate(omt, move_it, &oc); + uint32_t numvals = toku_omt_size(*omtp); + OMTVALUE *XMALLOC_N(numvals, newvals); + struct omt_compressor_state oc = { &new_kvspace, newvals }; + toku_omt_iterate(*omtp, move_it, &oc); + toku_omt_destroy(omtp); + toku_omt_create_steal_sorted_array(omtp, &newvals, numvals, numvals); if (maybe_free) { *maybe_free = memp->base; @@ -2901,12 +3270,12 @@ static void omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_si } void * -mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size, void **maybe_free) { +mempool_malloc_from_omt(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free) { void *v = toku_mempool_malloc(mp, size, 1); if (v == NULL) { - omt_compress_kvspace(omt, mp, size, maybe_free); + omt_compress_kvspace(omtp, mp, size, maybe_free); v = toku_mempool_malloc(mp, size, 1); - lazy_assert(v); + paranoid_invariant_notnull(v); } return v; } @@ -3709,7 +4078,7 @@ int fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo) * basement node. */ static void -do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, struct fifo_entry *entry, STAT64INFO stats_to_update) +do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, uint64_t *workdone, STAT64INFO stats_to_update) { // The messages are being iterated over in (key,msn) order or just in // msn order, so all the messages for one key, from one buffer, are in @@ -3734,7 +4103,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, str &t->ft->cmp_descriptor, bn, &ftcmd, - &BP_WORKDONE(ancestor, childnum), + workdone, stats_to_update ); } else { @@ -3750,17 +4119,16 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, str struct iterate_do_bn_apply_cmd_extra { FT_HANDLE t; BASEMENTNODE bn; - FTNODE ancestor; - int childnum; + NONLEAF_CHILDINFO bnc; + uint64_t *workdone; STAT64INFO stats_to_update; }; int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e) __attribute__((nonnull(3))); int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e) { - NONLEAF_CHILDINFO bnc = BNC(e->ancestor, e->childnum); - struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offset); - do_bn_apply_cmd(e->t, e->bn, e->ancestor, e->childnum, entry, e->stats_to_update); + struct fifo_entry *entry = toku_fifo_get_entry(e->bnc->buffer, offset); + do_bn_apply_cmd(e->t, e->bn, entry, e->workdone, e->stats_to_update); return 0; } @@ -3891,6 +4259,8 @@ bnc_apply_messages_to_basement_node( // Determine the offsets in the message trees between which we need to // apply messages from this buffer STAT64INFO_S stats_delta = {0,0}; + uint64_t workdone_this_ancestor = 0; + uint32_t stale_lbi, stale_ube; if (!bn->stale_ancestor_messages_applied) { find_bounds_within_message_tree(&t->ft->cmp_descriptor, t->ft->compare_fun, bnc->stale_message_tree, bnc->buffer, bounds, &stale_lbi, &stale_ube); @@ -3938,13 +4308,13 @@ bnc_apply_messages_to_basement_node( for (int i = 0; i < buffer_size; ++i) { *msgs_applied = true; struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]); - do_bn_apply_cmd(t, bn, ancestor, childnum, entry, &stats_delta); + do_bn_apply_cmd(t, bn, entry, &workdone_this_ancestor, &stats_delta); } toku_free(offsets); } else if (stale_lbi == stale_ube) { // No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later. - struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum, .stats_to_update = &stats_delta}; + struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta }; if (fresh_ube - fresh_lbi > 0) *msgs_applied = true; r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(fresh_lbi, fresh_ube, &iter_extra); assert_zero(r); @@ -3953,7 +4323,7 @@ bnc_apply_messages_to_basement_node( // No fresh messages to apply, we just apply stale messages. if (stale_ube - stale_lbi > 0) *msgs_applied = true; - struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .ancestor = ancestor, .childnum = childnum , .stats_to_update = &stats_delta}; + struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta }; r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(stale_lbi, stale_ube, &iter_extra); assert_zero(r); @@ -3961,11 +4331,12 @@ bnc_apply_messages_to_basement_node( // // update stats // + if (workdone_this_ancestor > 0) { + (void) toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum), workdone_this_ancestor); + } if (stats_delta.numbytes || stats_delta.numrows) { toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta); } -#if 0 -#endif } void @@ -3993,7 +4364,7 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds); for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) { - assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); + paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); bnc_apply_messages_to_basement_node( t, curr_bn, @@ -4045,7 +4416,7 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds); for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) { - assert(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); + paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum); if (bnc->broadcast_list.size() > 0) { needs_ancestors_messages = true; @@ -4099,7 +4470,7 @@ void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied) { // Any threads trying to update these basement nodes should be // updating them to the same thing (since they all have a read lock on // the same root-to-leaf path) so this is safe. - (void) __sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); + (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); } } } @@ -4202,7 +4573,7 @@ ok: ; idx--; break; default: - assert(false); + abort(); } r = toku_omt_fetch(bn->buffer, idx, &datav); assert_zero(r); // we just validated the index @@ -4353,7 +4724,7 @@ unlock_ftnode_fun (PAIR p, void *v) { (enum cachetable_dirty) node->dirty, x->msgs_applied ? make_ftnode_pair_attr(node) : make_invalid_pair_attr() ); - assert(r==0); + assert_zero(r); } /* search in a node's child */ @@ -4414,7 +4785,7 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F toku_unpin_ftnode(brt->ft, childnode); } else { - toku_unpin_ftnode_read_only(brt, childnode); + toku_unpin_ftnode_read_only(brt->ft, childnode); } } else { // try again. @@ -4430,7 +4801,7 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F toku_unpin_ftnode(brt->ft, childnode); } else { - toku_unpin_ftnode_read_only(brt, childnode); + toku_unpin_ftnode_read_only(brt->ft, childnode); } } } @@ -4565,7 +4936,8 @@ ft_search_node( { int r = 0; // assert that we got a valid child_to_search - assert(child_to_search >= 0 && child_to_search < node->n_children); + invariant(child_to_search >= 0); + invariant(child_to_search < node->n_children); // // At this point, we must have the necessary partition available to continue the search // @@ -4666,7 +5038,6 @@ toku_ft_search (FT_HANDLE brt, ft_search_t *search, FT_GET_CALLBACK_FUNCTION get try_again: trycount++; - assert(ft); // // Here is how searches work @@ -4740,7 +5111,7 @@ try_again: // some piece of a node that it needed was not in memory. // In this case, the node was not unpinned, so we unpin it here if (unlockers.locked) { - toku_unpin_ftnode_read_only(brt, node); + toku_unpin_ftnode_read_only(brt->ft, node); } goto try_again; } else { @@ -4749,7 +5120,7 @@ try_again: } assert(unlockers.locked); - toku_unpin_ftnode_read_only(brt, node); + toku_unpin_ftnode_read_only(brt->ft, node); //Heaviside function (+direction) queries define only a lower or upper @@ -5163,7 +5534,7 @@ keyrange_in_leaf_partition (FT_HANDLE brt, FTNODE node, DBT *key, int child_numb // If the partition is in main memory then estimate the number // If KEY==NULL then use an arbitrary key (leftmost or zero) { - assert(node->height == 0); // we are in a leaf + paranoid_invariant(node->height == 0); // we are in a leaf if (BP_STATE(node, child_number) == PT_AVAIL) { // If the partition is in main memory then get an exact count. struct keyrange_compare_s s = {brt,key}; @@ -5228,9 +5599,9 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node, &childnode, &msgs_applied ); - assert(!msgs_applied); + paranoid_invariant(!msgs_applied); if (r != TOKUDB_TRY_AGAIN) { - assert(r == 0); + assert_zero(r); struct unlock_ftnode_extra unlock_extra = {brt,childnode,false}; struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, unlockers}; @@ -5239,13 +5610,13 @@ toku_ft_keyrange_internal (FT_HANDLE brt, FTNODE node, r = toku_ft_keyrange_internal(brt, childnode, key, less, equal, greater, rows_per_child, bfe, &next_unlockers, &next_ancestors, &next_bounds); if (r != TOKUDB_TRY_AGAIN) { - assert(r == 0); + assert_zero(r); *less += rows_per_child * child_number; *greater += rows_per_child * (node->n_children - child_number - 1); assert(unlockers->locked); - toku_unpin_ftnode_read_only(brt, childnode); + toku_unpin_ftnode_read_only(brt->ft, childnode); } } } @@ -5261,7 +5632,6 @@ void toku_ft_keyrange(FT_HANDLE brt, DBT *key, uint64_t *less_p, uint64_t *equal // TODO 4184: What to do with a NULL key? // If KEY is NULL then the system picks an arbitrary key and returns it. { - assert(brt->ft); struct ftnode_fetch_extra bfe; fill_bfe_for_min_read(&bfe, brt->ft); // read pivot keys but not message buffers try_again: @@ -5302,7 +5672,7 @@ try_again: } } assert(unlockers.locked); - toku_unpin_ftnode_read_only(brt, node); + toku_unpin_ftnode_read_only(brt->ft, node); *less_p = less; *equal_p = equal; *greater_p = greater; @@ -5310,7 +5680,6 @@ try_again: } void toku_ft_handle_stat64 (FT_HANDLE brt, TOKUTXN UU(txn), struct ftstat64_s *s) { - assert(brt->ft); toku_ft_stat64(brt->ft, s); } @@ -5320,7 +5689,7 @@ toku_dump_ftnode (FILE *file, FT_HANDLE brt, BLOCKNUM blocknum, int depth, const int result=0; FTNODE node; toku_get_node_for_verify(blocknum, brt, &node); - result=toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0); + result=toku_verify_ftnode(brt, brt->ft->h->max_msn_in_ft, brt->ft->h->max_msn_in_ft, false, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0); uint32_t fullhash = toku_cachetable_hash(brt->ft->cf, blocknum); struct ftnode_fetch_extra bfe; fill_bfe_for_full_read(&bfe, brt->ft); diff --git a/ft/ft-serialize.cc b/ft/ft-serialize.cc index 409e46827c6..cea51ef3ecc 100644 --- a/ft/ft-serialize.cc +++ b/ft/ft-serialize.cc @@ -134,8 +134,8 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) { int r; FT ft = NULL; - invariant(version >= FT_LAYOUT_MIN_SUPPORTED_VERSION); - invariant(version <= FT_LAYOUT_VERSION); + paranoid_invariant(version >= FT_LAYOUT_MIN_SUPPORTED_VERSION); + paranoid_invariant(version <= FT_LAYOUT_VERSION); // We already know: // we have an rbuf representing the header. // The checksum has been validated @@ -290,6 +290,12 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) } } + MSN max_msn_in_ft; + max_msn_in_ft = ZERO_MSN; // We'll upgrade it from the root node later if necessary + if (ft->layout_version_read_from_disk >= FT_LAYOUT_VERSION_21) { + max_msn_in_ft = rbuf_msn(rb); + } + (void) rbuf_int(rb); //Read in checksum and ignore (already verified). if (rb->ndone != rb->size) { fprintf(stderr, "Header size did not match contents.\n"); @@ -317,6 +323,7 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) .basementnodesize = basementnodesize, .compression_method = compression_method, .highest_unused_msn_for_upgrade = highest_unused_msn_for_upgrade, + .max_msn_in_ft = max_msn_in_ft, .time_of_last_optimize_begin = time_of_last_optimize_begin, .time_of_last_optimize_end = time_of_last_optimize_end, .count_of_optimize_in_progress = count_of_optimize_in_progress, @@ -335,6 +342,12 @@ deserialize_ft_versioned(int fd, struct rbuf *rb, FT *ftp, uint32_t version) goto exit; } } + if (ft->layout_version_read_from_disk < FT_LAYOUT_VERSION_21) { + r = toku_upgrade_msn_from_root_to_header(fd, ft); + if (r != 0) { + goto exit; + } + } invariant((uint32_t) ft->layout_version_read_from_disk == version); r = deserialize_descriptor_from(fd, ft->blocktable, &ft->descriptor, version); @@ -366,10 +379,12 @@ serialize_ft_min_size (uint32_t version) { size_t size = 0; switch(version) { + case FT_LAYOUT_VERSION_21: + size += sizeof(MSN); // max_msn_in_ft case FT_LAYOUT_VERSION_20: case FT_LAYOUT_VERSION_19: size += 1; // compression method - size += sizeof(uint64_t); // highest_unused_msn_for_upgrade + size += sizeof(MSN); // highest_unused_msn_for_upgrade case FT_LAYOUT_VERSION_18: size += sizeof(uint64_t); // time_of_last_optimize_begin size += sizeof(uint64_t); // time_of_last_optimize_end @@ -412,9 +427,9 @@ serialize_ft_min_size (uint32_t version) { ); break; default: - lazy_assert(false); + abort(); } - + lazy_assert(size <= BLOCK_ALLOCATOR_HEADER_RESERVE); return size; } @@ -637,7 +652,7 @@ toku_deserialize_ft_from(int fd, version = version_1; } - invariant(rb); + paranoid_invariant(rb); r = deserialize_ft_versioned(fd, rb, ft, version); exit: @@ -694,6 +709,7 @@ void toku_serialize_ft_to_wbuf ( wbuf_MSN(wbuf, h->msn_at_start_of_last_completed_optimize); wbuf_char(wbuf, (unsigned char) h->compression_method); wbuf_MSN(wbuf, h->highest_unused_msn_for_upgrade); + wbuf_MSN(wbuf, h->max_msn_in_ft); uint32_t checksum = x1764_finish(&wbuf->checksum); wbuf_int(wbuf, checksum); lazy_assert(wbuf->ndone == wbuf->size); diff --git a/ft/ft-test-helpers.cc b/ft/ft-test-helpers.cc index a121fc2d658..d2647ab9ccf 100644 --- a/ft/ft-test-helpers.cc +++ b/ft/ft-test-helpers.cc @@ -136,15 +136,18 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char .u = { .id = { toku_fill_dbt(&keydbt, key, keylen), toku_fill_dbt(&valdbt, val, vallen) } } }; + static size_t zero_flow_deltas[] = { 0, 0 }; toku_ft_node_put_cmd ( brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, node, + -1, &cmd, true, + zero_flow_deltas, NULL - ); + ); toku_verify_or_set_counts(node); @@ -215,6 +218,8 @@ int toku_testsetup_insert_to_nonleaf (FT_HANDLE brt, BLOCKNUM blocknum, enum ft_ // using brt APIs. node->max_msn_applied_to_node_on_disk = msn; node->dirty = 1; + // Also hack max_msn_in_ft + brt->ft->h->max_msn_in_ft = msn; toku_unpin_ftnode(brt->ft, node); return 0; diff --git a/ft/ft-verify.cc b/ft/ft-verify.cc index c763cfa1ef9..834ed4b1d4c 100644 --- a/ft/ft-verify.cc +++ b/ft/ft-verify.cc @@ -245,7 +245,7 @@ toku_get_node_for_verify( static int toku_verify_ftnode_internal(FT_HANDLE brt, - MSN rootmsn, MSN parentmsn, + MSN rootmsn, MSN parentmsn, bool messages_exist_above, FTNODE node, int height, const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) @@ -258,16 +258,11 @@ toku_verify_ftnode_internal(FT_HANDLE brt, //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); toku_assert_entire_node_in_memory(node); this_msn = node->max_msn_applied_to_node_on_disk; - if (rootmsn.msn == ZERO_MSN.msn) { - assert(parentmsn.msn == ZERO_MSN.msn); - rootmsn = this_msn; - parentmsn = this_msn; - } if (height >= 0) { invariant(height == node->height); // this is a bad failure if wrong } - if (node->height > 0) { + if (node->height > 0 && messages_exist_above) { VERIFY_ASSERTION((parentmsn.msn >= this_msn.msn), 0, "node msn must be descending down tree, newest messages at top"); } // Verify that all the pivot keys are in order. @@ -390,7 +385,7 @@ done: // input is a pinned node, on exit, node is unpinned int toku_verify_ftnode (FT_HANDLE brt, - MSN rootmsn, MSN parentmsn, + MSN rootmsn, MSN parentmsn, bool messages_exist_above, FTNODE node, int height, const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.) @@ -402,11 +397,6 @@ toku_verify_ftnode (FT_HANDLE brt, //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); toku_assert_entire_node_in_memory(node); this_msn = node->max_msn_applied_to_node_on_disk; - if (rootmsn.msn == ZERO_MSN.msn) { - assert(parentmsn.msn == ZERO_MSN.msn); - rootmsn = this_msn; - parentmsn = this_msn; - } int result = 0; int result2 = 0; @@ -414,7 +404,7 @@ toku_verify_ftnode (FT_HANDLE brt, // Otherwise we'll just do the next call result = toku_verify_ftnode_internal( - brt, rootmsn, parentmsn, node, height, lesser_pivot, greatereq_pivot, + brt, rootmsn, parentmsn, messages_exist_above, node, height, lesser_pivot, greatereq_pivot, verbose, keep_going_on_failure, false); if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done; } @@ -422,7 +412,7 @@ toku_verify_ftnode (FT_HANDLE brt, toku_move_ftnode_messages_to_stale(brt->ft, node); } result2 = toku_verify_ftnode_internal( - brt, rootmsn, parentmsn, node, height, lesser_pivot, greatereq_pivot, + brt, rootmsn, parentmsn, messages_exist_above, node, height, lesser_pivot, greatereq_pivot, verbose, keep_going_on_failure, true); if (result == 0) { result = result2; @@ -434,7 +424,7 @@ toku_verify_ftnode (FT_HANDLE brt, for (int i = 0; i < node->n_children; i++) { FTNODE child_node; toku_get_node_for_verify(BP_BLOCKNUM(node, i), brt, &child_node); - int r = toku_verify_ftnode(brt, rootmsn, this_msn, + int r = toku_verify_ftnode(brt, rootmsn, this_msn, messages_exist_above || toku_bnc_n_entries(BNC(node, i)) > 0, child_node, node->height-1, (i==0) ? lesser_pivot : &node->childkeys[i-1], (i==node->n_children-1) ? greatereq_pivot : &node->childkeys[i], @@ -465,7 +455,7 @@ toku_verify_ft_with_progress (FT_HANDLE brt, int (*progress_callback)(void *extr toku_calculate_root_offset_pointer(brt->ft, &root_key, &root_hash); toku_get_node_for_verify(root_key, brt, &root_node); } - int r = toku_verify_ftnode(brt, ZERO_MSN, ZERO_MSN, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going); + int r = toku_verify_ftnode(brt, brt->ft->h->max_msn_in_ft, brt->ft->h->max_msn_in_ft, false, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going); if (r == 0) { toku_ft_lock(brt->ft); brt->ft->h->time_of_last_verification = time(NULL); @@ -479,4 +469,3 @@ int toku_verify_ft (FT_HANDLE brt) { return toku_verify_ft_with_progress(brt, NULL, NULL, 0, 0); } - @@ -13,6 +13,7 @@ #include <memory.h> #include <toku_assert.h> +#include <portability/toku_atomic.h> void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) { @@ -365,6 +366,7 @@ ft_header_create(FT_OPTIONS options, BLOCKNUM root_blocknum, TXNID root_xid_that .basementnodesize = options->basementnodesize, .compression_method = options->compression_method, .highest_unused_msn_for_upgrade = { .msn = (MIN_MSN.msn - 1) }, + .max_msn_in_ft = ZERO_MSN, .time_of_last_optimize_begin = 0, .time_of_last_optimize_end = 0, .count_of_optimize_in_progress = 0, @@ -850,14 +852,14 @@ toku_ft_get_cmp_descriptor(FT_HANDLE ft_handle) { void toku_ft_update_stats(STAT64INFO headerstats, STAT64INFO_S delta) { - (void) __sync_fetch_and_add(&(headerstats->numrows), delta.numrows); - (void) __sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes); + (void) toku_sync_fetch_and_add(&(headerstats->numrows), delta.numrows); + (void) toku_sync_fetch_and_add(&(headerstats->numbytes), delta.numbytes); } void toku_ft_decrease_stats(STAT64INFO headerstats, STAT64INFO_S delta) { - (void) __sync_fetch_and_sub(&(headerstats->numrows), delta.numrows); - (void) __sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes); + (void) toku_sync_fetch_and_sub(&(headerstats->numrows), delta.numrows); + (void) toku_sync_fetch_and_sub(&(headerstats->numbytes), delta.numbytes); } void diff --git a/ft/ft_layout_version.h b/ft/ft_layout_version.h index e95abb20c01..f1ae174db87 100644 --- a/ft/ft_layout_version.h +++ b/ft/ft_layout_version.h @@ -28,6 +28,7 @@ enum ft_layout_version_e { FT_LAYOUT_VERSION_20 = 20, // Deadshot: Add compression method to log_fcreate, // mgr_last_xid after begin checkpoint, // last_xid to shutdown + FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header FT_NEXT_VERSION, // the version after the current version FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line. FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported diff --git a/ft/ft_node-serialize.cc b/ft/ft_node-serialize.cc index 298148889d7..a4e62b07649 100644 --- a/ft/ft_node-serialize.cc +++ b/ft/ft_node-serialize.cc @@ -7,6 +7,7 @@ #include "ft-internal.h" #include "log-internal.h" #include <compress.h> +#include <portability/toku_atomic.h> #include <util/sort.h> #include <util/threadpool.h> @@ -198,7 +199,7 @@ serialize_node_header(FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) { wbuf_nocrc_literal_bytes(wbuf, "tokuleaf", 8); else wbuf_nocrc_literal_bytes(wbuf, "tokunode", 8); - invariant(node->layout_version == FT_LAYOUT_VERSION); + paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION); wbuf_nocrc_int(wbuf, node->layout_version); wbuf_nocrc_int(wbuf, node->layout_version_original); wbuf_nocrc_uint(wbuf, BUILD_ID); @@ -226,7 +227,7 @@ static uint32_t serialize_ftnode_partition_size (FTNODE node, int i) { uint32_t result = 0; - assert(node->bp[i].state == PT_AVAIL); + paranoid_invariant(node->bp[i].state == PT_AVAIL); result++; // Byte that states what the partition is if (node->height > 0) { result += 4; // size of bytes in buffer table @@ -253,7 +254,7 @@ serialize_nonleaf_childinfo(NONLEAF_CHILDINFO bnc, struct wbuf *wb) FIFO_ITERATE( bnc->buffer, key, keylen, data, datalen, type, msn, xids, is_fresh, { - invariant((int)type>=0 && type<256); + paranoid_invariant((int)type>=0 && type<256); wbuf_nocrc_char(wb, (unsigned char)type); wbuf_nocrc_char(wb, (unsigned char)is_fresh); wbuf_MSN(wb, msn); @@ -382,7 +383,6 @@ static void serialize_ftnode_info(FTNODE node, assert(sb->uncompressed_ptr == NULL); sb->uncompressed_size = serialize_ftnode_info_size(node); sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); - assert(sb->uncompressed_ptr); struct wbuf wb; wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size); @@ -956,7 +956,7 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf, dest = &broadcast_offsets[nbroadcast_offsets]; nbroadcast_offsets++; } else { - assert(false); + abort(); } } else { dest = NULL; @@ -1079,6 +1079,7 @@ NONLEAF_CHILDINFO toku_create_empty_nl(void) { cn->fresh_message_tree.create(); cn->stale_message_tree.create(); cn->broadcast_list.create(); + memset(cn->flow, 0, sizeof cn->flow); return cn; } @@ -1089,6 +1090,7 @@ NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) { cn->fresh_message_tree.create_no_array(); cn->stale_message_tree.create_no_array(); cn->broadcast_list.create_no_array(); + memset(cn->flow, 0, sizeof cn->flow); return cn; } @@ -1181,8 +1183,7 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb) } sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); - assert(sb->uncompressed_ptr); - + toku_decompress( (Bytef *) sb->uncompressed_ptr, sb->uncompressed_size, @@ -1198,10 +1199,9 @@ exit: void just_decompress_sub_block(struct sub_block *sb) { - // <CER> TODO: Add assert thta the subblock was read in. + // <CER> TODO: Add assert that the subblock was read in. sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); - assert(sb->uncompressed_ptr); - + toku_decompress( (Bytef *) sb->uncompressed_ptr, sb->uncompressed_size, @@ -1259,17 +1259,15 @@ deserialize_ftnode_info( } // now create the basement nodes or childinfos, depending on whether this is a - // leaf node or internal node + // leaf node or internal node // now the subtree_estimates // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf. - assert(node->bp!=NULL); // // now the pivots node->totalchildkeylens = 0; if (node->n_children > 1) { XMALLOC_N(node->n_children - 1, node->childkeys); - assert(node->childkeys); for (int i=0; i < node->n_children-1; i++) { bytevec childkeyptr; unsigned int cklen; @@ -1291,13 +1289,13 @@ deserialize_ftnode_info( for (int i = 0; i < node->n_children; i++) { BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb); BP_WORKDONE(node, i) = 0; - } + } } - + // make sure that all the data was read if (data_size != rb.ndone) { dump_bad_block(rb.buf, rb.size); - assert(false); + abort(); } exit: return r; @@ -1326,7 +1324,6 @@ update_bfe_using_ftnode(FTNODE node, struct ftnode_fetch_extra *bfe) // we can possibly require is a single basement node // we find out what basement node the query cares about // and check if it is available - assert(bfe->search); bfe->child_to_read = toku_ft_search_which_child( &bfe->h->cmp_descriptor, bfe->h->compare_fun, @@ -1372,17 +1369,16 @@ setup_partitions_using_bfe(FTNODE node, case PT_AVAIL: setup_available_ftnode_partition(node, i); BP_TOUCH_CLOCK(node,i); - continue; + break; case PT_COMPRESSED: set_BSB(node, i, sub_block_creat()); - continue; + break; case PT_ON_DISK: set_BNULL(node, i); - continue; - case PT_INVALID: break; + case PT_INVALID: + abort(); } - assert(false); } } @@ -1616,7 +1612,6 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, // Now decompress the subblock sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size); - assert(sb_node_info.uncompressed_ptr); toku_decompress( (Bytef *) sb_node_info.uncompressed_ptr, @@ -1638,7 +1633,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, // rbuf, so we might be able to store the compressed data for some // objects. // We can proceed to deserialize the individual subblocks. - assert(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch); + paranoid_invariant(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch); // setup the memory of the partitions // for partitions being decompressed, create either FIFO or basement node @@ -1647,8 +1642,8 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, if (bfe->type != ftnode_fetch_none) { PAIR_ATTR attr; - -r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr); + + r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr); if (r != 0) { goto cleanup; } @@ -1656,12 +1651,12 @@ r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr); // handle clock for (int i = 0; i < node->n_children; i++) { if (toku_bfe_wants_child_available(bfe, i)) { - assert(BP_STATE(node,i) == PT_AVAIL); + paranoid_invariant(BP_STATE(node,i) == PT_AVAIL); BP_TOUCH_CLOCK(node,i); } } *ftnode = node; - r = 0; // TODO: Why do we do this??? + r = 0; cleanup: if (r != 0) { @@ -1795,7 +1790,7 @@ deserialize_and_upgrade_internal_node(FTNODE node, // of messages in the buffer. MSN lowest; uint64_t amount = n_in_this_buffer; - lowest.msn = __sync_sub_and_fetch(&bfe->h->h->highest_unused_msn_for_upgrade.msn, amount); + lowest.msn = toku_sync_sub_and_fetch(&bfe->h->h->highest_unused_msn_for_upgrade.msn, amount); if (highest_msn.msn == 0) { highest_msn.msn = lowest.msn + n_in_this_buffer; } @@ -1821,7 +1816,7 @@ deserialize_and_upgrade_internal_node(FTNODE node, dest = &broadcast_offsets[nbroadcast_offsets]; nbroadcast_offsets++; } else { - assert(false); + abort(); } } else { dest = NULL; @@ -1962,8 +1957,6 @@ deserialize_and_upgrade_leaf_node(FTNODE node, if (version <= FT_LAYOUT_VERSION_13) { // Create our mempool. toku_mempool_construct(&bn->buffer_mempool, 0); - OMT omt = BLB_BUFFER(node, 0); - struct mempool *mp = &BLB_BUFFER_MEMPOOL(node, 0); // Loop through for (int i = 0; i < n_in_buf; ++i) { LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]); @@ -1975,11 +1968,11 @@ deserialize_and_upgrade_leaf_node(FTNODE node, r = toku_le_upgrade_13_14(le, &new_le_size, &new_le, - omt, - mp); + &bn->buffer, + &bn->buffer_mempool); assert_zero(r); // Copy the pointer value straight into the OMT - r = toku_omt_insert_at(omt, new_le, i); + r = toku_omt_insert_at(bn->buffer, new_le, i); assert_zero(r); bn->n_bytes_in_buffer += new_le_size; } @@ -2259,7 +2252,7 @@ deserialize_ftnode_from_rbuf( // now that the node info has been deserialized, we can proceed to deserialize // the individual sub blocks - assert(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch); + paranoid_invariant(bfe->type == ftnode_fetch_none || bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_all || bfe->type == ftnode_fetch_prefetch); // setup the memory of the partitions // for partitions being decompressed, create either FIFO or basement node @@ -2306,20 +2299,18 @@ deserialize_ftnode_from_rbuf( if (r != 0) { goto cleanup; } - continue; + break; case PT_COMPRESSED: // case where we leave the partition in the compressed state r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i); if (r != 0) { goto cleanup; } - continue; + break; case PT_INVALID: // this is really bad case PT_ON_DISK: // it's supposed to be in memory. - assert(0); - continue; + abort(); } - assert(0); } *ftnode = node; r = 0; @@ -2745,7 +2736,8 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead])); // verify the number of sub blocks - invariant(0 <= n_sub_blocks && n_sub_blocks <= max_sub_blocks); + invariant(0 <= n_sub_blocks); + invariant(n_sub_blocks <= max_sub_blocks); { // verify the header checksum uint32_t header_length = node_header_overhead + sub_block_header_size(n_sub_blocks); @@ -2799,7 +2791,6 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s size = node_header_overhead + uncompressed_size; unsigned char *buf; XMALLOC_N(size, buf); - lazy_assert(buf); rbuf_init(rb, buf, size); // copy the uncompressed node header to the uncompressed buffer @@ -2820,7 +2811,6 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s dump_bad_block(raw_block, raw_block_size); goto exit; } - lazy_assert_zero(r); toku_trace("decompress done"); @@ -2840,7 +2830,7 @@ decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_blo r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum); break; default: - lazy_assert(false); + abort(); } return r; } @@ -2886,7 +2876,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, fprintf(stderr, "Checksum failure while reading raw block in file %s.\n", toku_cachefile_fname_in_env(h->cf)); - assert(false); + abort(); } else { r = toku_db_badformat(); goto cleanup; @@ -2949,7 +2939,6 @@ cleanup: return r; } - int toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) { @@ -2962,7 +2951,7 @@ toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) struct ftnode_fetch_extra bfe; fill_bfe_for_min_read(&bfe, h); r = deserialize_ftnode_from_fd(fd, h->h->root_blocknum, 0, &unused_node, &unused_ndd, - &bfe, &h->h->on_disk_stats); + &bfe, &h->h->on_disk_stats); h->in_memory_stats = h->h->on_disk_stats; if (unused_node) { @@ -2974,5 +2963,27 @@ toku_upgrade_subtree_estimates_to_stat64info(int fd, FT h) return r; } -#undef UPGRADE_STATUS_VALUE +int +toku_upgrade_msn_from_root_to_header(int fd, FT h) +{ + int r; + // 21 was the first version with max_msn_in_ft in the header + invariant(h->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20); + FTNODE node; + FTNODE_DISK_DATA ndd; + struct ftnode_fetch_extra bfe; + fill_bfe_for_min_read(&bfe, h); + r = deserialize_ftnode_from_fd(fd, h->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr); + if (r != 0) { + goto exit; + } + + h->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk; + toku_ftnode_free(&node); + toku_free(ndd); + exit: + return r; +} + +#undef UPGRADE_STATUS_VALUE diff --git a/ft/fttypes.h b/ft/fttypes.h index 555d4764b3a..f3182990924 100644 --- a/ft/fttypes.h +++ b/ft/fttypes.h @@ -281,6 +281,10 @@ enum reactivity { RE_FISSIBLE }; +enum split_mode { + SPLIT_EVENLY, + SPLIT_LEFT_HEAVY, + SPLIT_RIGHT_HEAVY +}; #endif - diff --git a/ft/leafentry.h b/ft/leafentry.h index 6a645e0a074..33788255c5b 100644 --- a/ft/leafentry.h +++ b/ft/leafentry.h @@ -175,12 +175,12 @@ int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t * size_t leafentry_disksize_13(LEAFENTRY_13 le); -int +int toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data. - size_t *new_leafentry_memorysize, - LEAFENTRY *new_leafentry_p, - OMT omt, - struct mempool *mp); + size_t *new_leafentry_memorysize, + LEAFENTRY *new_leafentry_p, + OMT *omtp, + struct mempool *mp); diff --git a/ft/locking-benchmarks/mfence-benchmark.cc b/ft/locking-benchmarks/mfence-benchmark.cc index 21b46c519bf..8e8e0cfccd1 100644 --- a/ft/locking-benchmarks/mfence-benchmark.cc +++ b/ft/locking-benchmarks/mfence-benchmark.cc @@ -34,6 +34,7 @@ lfence: 12.9ns/loop (marginal cost= -0.1ns) #include <sys/time.h> #include <stdio.h> +#include <portability/toku_atomic.h> enum { COUNT = 100000000 }; @@ -67,8 +68,8 @@ static inline void sfence (void) { int lock_for_lock_and_unlock; static inline void lock_and_unlock (void) { - (void)__sync_lock_test_and_set(&lock_for_lock_and_unlock, 1); - __sync_lock_release(&lock_for_lock_and_unlock); + (void)toku_sync_lock_test_and_set(&lock_for_lock_and_unlock, 1); + toku_sync_lock_release(&lock_for_lock_and_unlock); } diff --git a/ft/locking-benchmarks/pthread-locks.cc b/ft/locking-benchmarks/pthread-locks.cc index 626af6054fc..8dc94fd89a1 100644 --- a/ft/locking-benchmarks/pthread-locks.cc +++ b/ft/locking-benchmarks/pthread-locks.cc @@ -13,6 +13,7 @@ #include <stdio.h> #include <sys/time.h> #include <pthread.h> +#include <portability/toku_atomic.h> float tdiff (struct timeval *start, struct timeval *end) { return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec); @@ -71,13 +72,13 @@ fetch_and_add_i (volatile int *p, int incr) static inline int gcc_fetch_and_add_i (volatile int *p, int incr) { - return __sync_fetch_and_add(p, incr); + return toku_sync_fetch_and_add(p, incr); } static inline long gcc_fetch_and_add_l (volatile long *p, long incr) { - return __sync_fetch_and_add(p, incr); + return toku_sync_fetch_and_add(p, incr); } // Something wrong with the compiler for longs diff --git a/ft/locking-benchmarks/trylock-rdtsc.cc b/ft/locking-benchmarks/trylock-rdtsc.cc index 1495570939d..707d07f7a82 100644 --- a/ft/locking-benchmarks/trylock-rdtsc.cc +++ b/ft/locking-benchmarks/trylock-rdtsc.cc @@ -13,6 +13,7 @@ #include <sys/time.h> #include <unistd.h> #include <rdtsc.h> +#include <portability/toku_atomic.h> float tdiff (struct timeval *start, struct timeval *end) { return 1e6*(end->tv_sec-start->tv_sec) +(end->tv_usec - start->tv_usec); @@ -135,12 +136,12 @@ int main(int argc __attribute__((unused)), char **argv) { static int lock_for_lock_and_unlock; t_start = rdtsc(); - (void)__sync_lock_test_and_set(&lock_for_lock_and_unlock, 1); + (void)toku_sync_lock_test_and_set(&lock_for_lock_and_unlock, 1); t_end = rdtsc(); printf("sync_lock_test_and_set took %llu clocks\n", t_end-t_start); t_start = rdtsc(); - __sync_lock_release(&lock_for_lock_and_unlock); + toku_sync_lock_release(&lock_for_lock_and_unlock); t_end = rdtsc(); printf("sync_lock_release took %llu clocks\n", t_end-t_start); } @@ -148,7 +149,7 @@ int main(int argc __attribute__((unused)), char **argv) { t_start = rdtsc(); - (void)__sync_synchronize(); + (void)toku_sync_synchronize(); t_end = rdtsc(); printf("sync_synchornize took %llu clocks\n", t_end-t_start); } diff --git a/ft/rollback-ct-callbacks.h b/ft/rollback-ct-callbacks.h index cead2900aab..09c4c3757f4 100644 --- a/ft/rollback-ct-callbacks.h +++ b/ft/rollback-ct-callbacks.h @@ -44,6 +44,7 @@ static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(FT wc.pe_callback = toku_rollback_pe_callback; wc.cleaner_callback = toku_rollback_cleaner_callback; wc.clone_callback = toku_rollback_clone_callback; + wc.checkpoint_complete_callback = nullptr; wc.write_extraargs = h; return wc; } diff --git a/ft/rollback.cc b/ft/rollback.cc index 7f173dc000a..b0ac13b51ad 100644 --- a/ft/rollback.cc +++ b/ft/rollback.cc @@ -84,7 +84,7 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) { log->layout_version_read_from_disk = FT_LAYOUT_VERSION; log->dirty = true; log->sequence = 0; - log->previous = {0}; + log->previous = make_blocknum(0); log->previous_hash = 0; log->oldest_logentry = NULL; log->newest_logentry = NULL; diff --git a/ft/tests/cachetable-checkpoint-pending.cc b/ft/tests/cachetable-checkpoint-pending.cc index 7a1a08c7af9..35a0f0ff06d 100644 --- a/ft/tests/cachetable-checkpoint-pending.cc +++ b/ft/tests/cachetable-checkpoint-pending.cc @@ -9,6 +9,7 @@ #include <unistd.h> #include "cachetable-test.h" #include "checkpoint.h" +#include <portability/toku_atomic.h> static int N; // how many items in the table static CACHEFILE cf; @@ -54,9 +55,9 @@ flush ( int *CAST_FROM_VOIDP(v, value); if (*v!=expect_value) printf("got %d expect %d\n", *v, expect_value); assert(*v==expect_value); - (void)__sync_fetch_and_add(&n_flush, 1); - if (write_me) (void)__sync_fetch_and_add(&n_write_me, 1); - if (keep_me) (void)__sync_fetch_and_add(&n_keep_me, 1); + (void)toku_sync_fetch_and_add(&n_flush, 1); + if (write_me) (void)toku_sync_fetch_and_add(&n_write_me, 1); + if (keep_me) (void)toku_sync_fetch_and_add(&n_keep_me, 1); sleep_random(); } diff --git a/ft/tests/cachetable-checkpoint-test.cc b/ft/tests/cachetable-checkpoint-test.cc index b565cd8660a..0deeb9e7621 100644 --- a/ft/tests/cachetable-checkpoint-test.cc +++ b/ft/tests/cachetable-checkpoint-test.cc @@ -106,7 +106,7 @@ static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) { CACHEKEY key = make_blocknum(i); uint32_t hi = toku_cachetable_hash(f1, key); void *v; - r = toku_cachetable_maybe_get_and_pin(f1, key, hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, key, hi, PL_WRITE_EXPENSIVE, &v); if (r != 0) continue; r = toku_test_cachetable_unpin(f1, key, hi, CACHETABLE_CLEAN, make_pair_attr(item_size)); diff --git a/ft/tests/cachetable-checkpointer-class.cc b/ft/tests/cachetable-checkpointer-class.cc index 931858aa278..0a8d0e5d592 100644 --- a/ft/tests/cachetable-checkpointer-class.cc +++ b/ft/tests/cachetable-checkpointer-class.cc @@ -193,6 +193,7 @@ void checkpointer_test::add_pairs(struct cachefile *cf, attr.cache_pressure_size = 0; attr.is_valid = true; CACHETABLE_WRITE_CALLBACK cb; + ZERO_STRUCT(cb); // All nullptr for (uint32_t i = k; i < count + k; ++i) { CACHEKEY key; @@ -201,12 +202,12 @@ void checkpointer_test::add_pairs(struct cachefile *cf, pair_init(&(pairs[i]), cf, key, - NULL, + nullptr, attr, CACHETABLE_CLEAN, full_hash, cb, - NULL, + nullptr, m_cp.m_list); m_cp.m_list->put(&pairs[i]); diff --git a/ft/tests/cachetable-count-pinned-test.cc b/ft/tests/cachetable-count-pinned-test.cc index 99729c93a58..0c112151e08 100644 --- a/ft/tests/cachetable-count-pinned-test.cc +++ b/ft/tests/cachetable-count-pinned-test.cc @@ -27,7 +27,8 @@ cachetable_count_pinned_test (int n) { assert(toku_cachefile_count_pinned(f1, 0) == i); void *v; - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); + assert(r == -1); assert(toku_cachefile_count_pinned(f1, 0) == i); diff --git a/ft/tests/cachetable-flush-test.cc b/ft/tests/cachetable-flush-test.cc index a3dc1a6f5b8..be66b3cc491 100644 --- a/ft/tests/cachetable-flush-test.cc +++ b/ft/tests/cachetable-flush-test.cc @@ -43,12 +43,12 @@ test_cachetable_def_flush (int n) { uint32_t hi; void *v; hi = toku_cachetable_hash(f1, make_blocknum(i)); - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); assert(r == 0 && v == (void *)(long)i); r = toku_test_cachetable_unpin(f1, make_blocknum(i), hi, CACHETABLE_CLEAN, make_pair_attr(1)); assert(r == 0); hi = toku_cachetable_hash(f2, make_blocknum(i)); - r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); assert(r == 0 && v == (void *)(long)i); r = toku_test_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, make_pair_attr(1)); assert(r == 0); @@ -63,10 +63,10 @@ test_cachetable_def_flush (int n) { uint32_t hi; void *v; hi = toku_cachetable_hash(f1, make_blocknum(i)); - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); assert(r != 0); hi = toku_cachetable_hash(f2, make_blocknum(i)); - r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f2, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); assert(r == 0); r = toku_test_cachetable_unpin(f2, make_blocknum(i), hi, CACHETABLE_CLEAN, make_pair_attr(1)); assert(r == 0); diff --git a/ft/tests/cachetable-prefetch-checkpoint-test.cc b/ft/tests/cachetable-prefetch-checkpoint-test.cc index 01b42a56843..cde523b5757 100644 --- a/ft/tests/cachetable-prefetch-checkpoint-test.cc +++ b/ft/tests/cachetable-prefetch-checkpoint-test.cc @@ -122,7 +122,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir CACHEKEY key = make_blocknum(i); uint32_t hi = toku_cachetable_hash(f1, key); void *v; - r = toku_cachetable_maybe_get_and_pin(f1, key, hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, key, hi, PL_WRITE_EXPENSIVE, &v); if (r != 0) continue; r = toku_test_cachetable_unpin(f1, key, hi, CACHETABLE_CLEAN, make_pair_attr(item_size)); diff --git a/ft/tests/cachetable-prefetch-maybegetandpin-test.cc b/ft/tests/cachetable-prefetch-maybegetandpin-test.cc index df52ab50047..128c771a832 100644 --- a/ft/tests/cachetable-prefetch-maybegetandpin-test.cc +++ b/ft/tests/cachetable-prefetch-maybegetandpin-test.cc @@ -51,7 +51,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) { int i; for (i=1; i>=0; i++) { void *v; - r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, &v); + r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, PL_WRITE_EXPENSIVE, &v); if (r == 0) break; toku_pthread_yield(); } diff --git a/ft/tests/cachetable-prefetch2-test.cc b/ft/tests/cachetable-prefetch2-test.cc index 09d91e20cf8..6e3670a1a89 100644 --- a/ft/tests/cachetable-prefetch2-test.cc +++ b/ft/tests/cachetable-prefetch2-test.cc @@ -58,7 +58,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) { int i; for (i=1; i>=0; i++) { void *v; - r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, &v); + r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, PL_WRITE_EXPENSIVE, &v); if (r == 0) break; toku_pthread_yield(); } diff --git a/ft/tests/cachetable-put-test.cc b/ft/tests/cachetable-put-test.cc index 64fa16d5520..77f8cc83b04 100644 --- a/ft/tests/cachetable-put-test.cc +++ b/ft/tests/cachetable-put-test.cc @@ -26,7 +26,7 @@ cachetable_put_test (int n) { assert(toku_cachefile_count_pinned(f1, 0) == i); void *v; - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); assert(r == -1); assert(toku_cachefile_count_pinned(f1, 0) == i); diff --git a/ft/tests/cachetable-simple-maybe-get-pin.cc b/ft/tests/cachetable-simple-maybe-get-pin.cc index 7b7216fd2bc..11feeffe838 100644 --- a/ft/tests/cachetable-simple-maybe-get-pin.cc +++ b/ft/tests/cachetable-simple-maybe-get-pin.cc @@ -26,37 +26,37 @@ cachetable_test (void) { void* v1; long s1; // nothing in cachetable, so this should fail - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==-1); r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL); r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); // maybe_get_and_pin_clean should succeed, maybe_get_and_pin should fail - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==-1); - r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r == 0); r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); // maybe_get_and_pin_clean should succeed, maybe_get_and_pin should fail - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==0); // now these calls should fail because the node is already pinned, and therefore in use - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==-1); - r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin_clean(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==-1); r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); // sanity check, this should still succeed, because the PAIR is dirty - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==0); r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); CHECKPOINTER cp = toku_cachetable_get_checkpointer(ct); toku_cachetable_begin_checkpoint(cp, NULL); // now these should fail, because the node should be pending a checkpoint - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==-1); - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, &v1); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(1), 1, PL_WRITE_EXPENSIVE, &v1); assert(r==-1); toku_cachetable_end_checkpoint( cp, diff --git a/ft/tests/cachetable-test.cc b/ft/tests/cachetable-test.cc index c4c4014860c..e0ed9a3518c 100644 --- a/ft/tests/cachetable-test.cc +++ b/ft/tests/cachetable-test.cc @@ -105,7 +105,7 @@ static void test_nested_pin (void) { assert(i0==0); r = toku_test_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, make_pair_attr(test_object_size)); assert(r==0); - r = toku_cachetable_maybe_get_and_pin(f, make_blocknum(1), f1hash, &vv2); + r = toku_cachetable_maybe_get_and_pin(f, make_blocknum(1), f1hash, PL_WRITE_EXPENSIVE, &vv2); assert(r==0); assert(vv2==vv); r = toku_test_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, make_pair_attr(test_object_size)); diff --git a/ft/tests/cachetable-unpin-and-remove-test.cc b/ft/tests/cachetable-unpin-and-remove-test.cc index 8a76a0764c5..a490e62b730 100644 --- a/ft/tests/cachetable-unpin-and-remove-test.cc +++ b/ft/tests/cachetable-unpin-and-remove-test.cc @@ -63,7 +63,7 @@ cachetable_unpin_and_remove_test (int n) { // verify that k is removed void *v; - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(testkeys[i].b), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(testkeys[i].b), hi, PL_WRITE_EXPENSIVE, &v); assert(r != 0); testkeys[i] = testkeys[nkeys-1]; nkeys -= 1; diff --git a/ft/tests/cachetable-unpin-test.cc b/ft/tests/cachetable-unpin-test.cc index e3f48887cc2..8a561dd5e03 100644 --- a/ft/tests/cachetable-unpin-test.cc +++ b/ft/tests/cachetable-unpin-test.cc @@ -27,7 +27,7 @@ cachetable_unpin_test (int n) { assert(toku_cachefile_count_pinned(f1, 0) == i); void *v; - r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, &v); + r = toku_cachetable_maybe_get_and_pin(f1, make_blocknum(i), hi, PL_WRITE_EXPENSIVE, &v); assert(r == -1); assert(toku_cachefile_count_pinned(f1, 0) == i); diff --git a/ft/tests/cachetable-writer-thread-limit.cc b/ft/tests/cachetable-writer-thread-limit.cc index 03976181a22..557243b7e64 100644 --- a/ft/tests/cachetable-writer-thread-limit.cc +++ b/ft/tests/cachetable-writer-thread-limit.cc @@ -4,6 +4,7 @@ #ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved." #include "includes.h" #include "test.h" +#include <portability/toku_atomic.h> static int total_size; @@ -25,7 +26,7 @@ flush (CACHEFILE f __attribute__((__unused__)), bool UU(is_clone) ) { if (w) { - int curr_size = __sync_fetch_and_sub(&total_size, 1); + int curr_size = toku_sync_fetch_and_sub(&total_size, 1); assert(curr_size <= 200); usleep(500*1000); } @@ -49,7 +50,7 @@ cachetable_test (void) { CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL); wc.flush_callback = flush; toku_cachetable_put(f1, make_blocknum(i), i, NULL, make_pair_attr(1), wc, put_callback_nop); - int curr_size = __sync_fetch_and_add(&total_size, 1); + int curr_size = toku_sync_fetch_and_add(&total_size, 1); assert(curr_size <= test_limit + test_limit/2+1); r = toku_test_cachetable_unpin(f1, make_blocknum(i), i, CACHETABLE_DIRTY, make_pair_attr(4)); } diff --git a/ft/tests/ftloader-error-injector.h b/ft/tests/ftloader-error-injector.h index 9f32320fee9..e49407a1af5 100644 --- a/ft/tests/ftloader-error-injector.h +++ b/ft/tests/ftloader-error-injector.h @@ -7,6 +7,7 @@ #ifndef FTLOADER_ERROR_INJECTOR_H #define FTLOADER_ERROR_INJECTOR_H +#include <portability/toku_atomic.h> static toku_mutex_t event_mutex = TOKU_MUTEX_INITIALIZER; static void lock_events(void) { @@ -107,9 +108,9 @@ static void reset_my_malloc_counts(void) { __attribute__((__unused__)) static void *my_malloc(size_t n) { - (void) __sync_fetch_and_add(&my_malloc_count, 1); // my_malloc_count++; + (void) toku_sync_fetch_and_add(&my_malloc_count, 1); // my_malloc_count++; if (n >= my_big_malloc_limit) { - (void) __sync_fetch_and_add(&my_big_malloc_count, 1); // my_big_malloc_count++; + (void) toku_sync_fetch_and_add(&my_big_malloc_count, 1); // my_big_malloc_count++; if (do_malloc_errors) { if (event_add_and_fetch() == event_count_trigger) { event_hit(); @@ -125,9 +126,9 @@ static int do_realloc_errors = 0; __attribute__((__unused__)) static void *my_realloc(void *p, size_t n) { - (void) __sync_fetch_and_add(&my_realloc_count, 1); // my_realloc_count++; + (void) toku_sync_fetch_and_add(&my_realloc_count, 1); // my_realloc_count++; if (n >= my_big_malloc_limit) { - (void) __sync_fetch_and_add(&my_big_realloc_count, 1); // my_big_realloc_count++; + (void) toku_sync_fetch_and_add(&my_big_realloc_count, 1); // my_big_realloc_count++; if (do_realloc_errors) { if (event_add_and_fetch() == event_count_trigger) { event_hit(); diff --git a/ft/tests/keyrange.cc b/ft/tests/keyrange.cc index 7e277946f9d..603f5a9a320 100644 --- a/ft/tests/keyrange.cc +++ b/ft/tests/keyrange.cc @@ -82,8 +82,8 @@ static void test_keyrange (enum memory_state ms, uint64_t limit) { struct ftstat64_s s; toku_ft_handle_stat64(t, null_txn, &s); - assert(0 < s.nkeys && s.nkeys < limit); - assert(0 < s.dsize && s.dsize < limit * (9 + 9)); // keylen = 9, vallen = 9 + assert(0 < s.nkeys && s.nkeys <= limit); + assert(0 < s.dsize && s.dsize <= limit * (9 + 9)); // keylen = 9, vallen = 9 } maybe_reopen(ms, limit); diff --git a/ft/tests/make-tree.cc b/ft/tests/make-tree.cc index ea8fc8051d8..acafd2f8e1e 100644 --- a/ft/tests/make-tree.cc +++ b/ft/tests/make-tree.cc @@ -138,7 +138,7 @@ test_make_tree(int height, int fanout, int nperleaf, int do_verify) { // set the new root to point to the new tree toku_ft_set_new_root_blocknum(brt->ft, newroot->thisnodename); - newroot->max_msn_applied_to_node_on_disk = last_dummymsn(); // capture msn of last message injected into tree + brt->ft->h->max_msn_in_ft = last_dummymsn(); // capture msn of last message injected into tree // unpin the new root toku_unpin_ftnode(brt->ft, newroot); diff --git a/ft/tests/msnfilter.cc b/ft/tests/msnfilter.cc index 43c9bf7ce6c..2904714a366 100644 --- a/ft/tests/msnfilter.cc +++ b/ft/tests/msnfilter.cc @@ -45,10 +45,10 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va // apply an insert to the leaf node MSN msn = next_dummymsn(); + brt->ft->h->max_msn_in_ft = msn; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; - uint64_t workdone=0; - toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &cmd, &workdone, NULL); + toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, nullptr, nullptr); { int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair); assert(r==0); @@ -56,9 +56,8 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va } FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} }; - toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &badcmd, &workdone, NULL); + toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, nullptr, nullptr); - // message should be rejected for duplicate msn, row should still have original val { int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair); @@ -68,9 +67,10 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va // now verify that message with proper msn gets through msn = next_dummymsn(); + brt->ft->h->max_msn_in_ft = msn; FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} }; - toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &cmd2, &workdone, NULL); - + toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, nullptr, nullptr); + // message should be accepted, val should have new value { int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair2); @@ -81,8 +81,8 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va // now verify that message with lesser (older) msn is rejected msn.msn = msn.msn - 10; FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }}; - toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, &cmd3, &workdone, NULL); - + toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, nullptr, nullptr); + // message should be rejected, val should still have value in pair2 { int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair2); diff --git a/ft/tests/orthopush-flush.cc b/ft/tests/orthopush-flush.cc index 19a9a63aa4d..7e45b173c1b 100644 --- a/ft/tests/orthopush-flush.cc +++ b/ft/tests/orthopush-flush.cc @@ -580,7 +580,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { if (make_leaf_up_to_date) { for (i = 0; i < num_parent_messages; ++i) { if (!parent_messages_is_fresh[i]) { - toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, parent_messages[i], NULL, NULL); + toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL); } } for (i = 0; i < 8; ++i) { @@ -803,7 +803,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) { for (i = 0; i < num_parent_messages; ++i) { if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 && !parent_messages_is_fresh[i]) { - toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, parent_messages[i], NULL, NULL); + toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL); } } for (i = 0; i < 8; ++i) { @@ -995,8 +995,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { if (make_leaf_up_to_date) { for (i = 0; i < num_parent_messages; ++i) { if (!parent_messages_is_fresh[i]) { - toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, parent_messages[i], NULL, NULL); - toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, parent_messages[i], NULL, NULL); + toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], NULL, NULL); + toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], NULL, NULL); } } for (i = 0; i < 8; ++i) { diff --git a/ft/tests/test.h b/ft/tests/test.h index b3b9aa21de5..71b655cfcd0 100644 --- a/ft/tests/test.h +++ b/ft/tests/test.h @@ -228,7 +228,8 @@ static UU() CACHETABLE_WRITE_CALLBACK def_write_callback(void* write_extraargs) wc.pe_callback = def_pe_callback; wc.cleaner_callback = def_cleaner_callback; wc.write_extraargs = write_extraargs; - wc.clone_callback = NULL; + wc.clone_callback = nullptr; + wc.checkpoint_complete_callback = nullptr; return wc; } diff --git a/ft/tests/test3884.cc b/ft/tests/test3884.cc index 920d3993916..966a8f90cac 100644 --- a/ft/tests/test3884.cc +++ b/ft/tests/test3884.cc @@ -171,7 +171,7 @@ test_split_on_boundary(void) FTNODE nodea, nodeb; DBT splitk; // if we haven't done it right, we should hit the assert in the top of move_leafentries - ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL); + ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL); verify_basement_node_msns(nodea, dummy_msn_3884); verify_basement_node_msns(nodeb, dummy_msn_3884); @@ -244,7 +244,7 @@ test_split_with_everything_on_the_left(void) FTNODE nodea, nodeb; DBT splitk; // if we haven't done it right, we should hit the assert in the top of move_leafentries - ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL); + ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL); toku_unpin_ftnode(brt->ft, nodeb); r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0); @@ -319,7 +319,7 @@ test_split_on_boundary_of_last_node(void) FTNODE nodea, nodeb; DBT splitk; // if we haven't done it right, we should hit the assert in the top of move_leafentries - ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL); + ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL); toku_unpin_ftnode(brt->ft, nodeb); r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0); @@ -387,7 +387,7 @@ test_split_at_begin(void) FTNODE nodea, nodeb; DBT splitk; // if we haven't done it right, we should hit the assert in the top of move_leafentries - ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL); + ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL); toku_unpin_ftnode(brt->ft, nodeb); r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0); @@ -451,7 +451,7 @@ test_split_at_end(void) FTNODE nodea, nodeb; DBT splitk; // if we haven't done it right, we should hit the assert in the top of move_leafentries - ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL); + ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL); toku_unpin_ftnode(brt->ft, nodeb); r = toku_close_ft_handle_nolsn(brt, NULL); assert(r == 0); @@ -505,7 +505,7 @@ test_split_odd_nodes(void) FTNODE nodea, nodeb; DBT splitk; // if we haven't done it right, we should hit the assert in the top of move_leafentries - ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, 0, NULL); + ftleaf_split(brt->ft, &sn, &nodea, &nodeb, &splitk, true, SPLIT_EVENLY, 0, NULL); verify_basement_node_msns(nodea, dummy_msn_3884); verify_basement_node_msns(nodeb, dummy_msn_3884); diff --git a/ft/ule-internal.h b/ft/ule-internal.h index 5a023875602..db0cebd4bd3 100644 --- a/ft/ule-internal.h +++ b/ft/ule-internal.h @@ -60,10 +60,10 @@ void test_msg_modify_ule(ULE ule, FT_MSG msg); //Functions exported for test purposes only (used internally for non-test purposes). void le_unpack(ULE ule, LEAFENTRY le); int le_pack(ULE ule, // data to be packed into new leafentry - size_t *new_leafentry_memorysize, + size_t *new_leafentry_memorysize, LEAFENTRY * const new_leafentry_p, // this is what this function creates - OMT omt, - struct mempool *mp, + OMT *omtp, + struct mempool *mp, void **maybe_free); diff --git a/ft/ule.cc b/ft/ule.cc index 565a9c1670e..3aeb6ed26e1 100644 --- a/ft/ule.cc +++ b/ft/ule.cc @@ -154,11 +154,11 @@ static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p); static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p); static void * -le_malloc(OMT omt, struct mempool *mp, size_t size, void **maybe_free) +le_malloc(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free) { void * rval; - if (omt) - rval = mempool_malloc_from_omt(omt, mp, size, maybe_free); + if (omtp) + rval = mempool_malloc_from_omt(omtp, mp, size, maybe_free); else rval = toku_xmalloc(size); resource_assert(rval); @@ -317,14 +317,14 @@ done:; // If the leafentry is destroyed it sets *new_leafentry_p to NULL. // Otehrwise the new_leafentry_p points at the new leaf entry. // As of October 2011, this function always returns 0. -int +int apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry - LEAFENTRY old_leafentry, // NULL if there was no stored data. - size_t *new_leafentry_memorysize, - LEAFENTRY *new_leafentry_p, - OMT omt, - struct mempool *mp, - void **maybe_free, + LEAFENTRY old_leafentry, // NULL if there was no stored data. + size_t *new_leafentry_memorysize, + LEAFENTRY *new_leafentry_p, + OMT *omtp, + struct mempool *mp, + void **maybe_free, int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead ULE_S ule; int rval; @@ -334,17 +334,17 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry if (old_leafentry == NULL) // if leafentry does not exist ... msg_init_empty_ule(&ule, msg); // ... create empty unpacked leaf entry else { - le_unpack(&ule, old_leafentry); // otherwise unpack leafentry + le_unpack(&ule, old_leafentry); // otherwise unpack leafentry oldnumbytes = ule_get_innermost_numbytes(&ule); } msg_modify_ule(&ule, msg); // modify unpacked leafentry rval = le_pack(&ule, // create packed leafentry - new_leafentry_memorysize, - new_leafentry_p, - omt, - mp, - maybe_free); - if (new_leafentry_p) + new_leafentry_memorysize, + new_leafentry_p, + omtp, + mp, + maybe_free); + if (new_leafentry_p) newnumbytes = ule_get_innermost_numbytes(&ule); *numbytes_delta_p = newnumbytes - oldnumbytes; ule_cleanup(&ule); @@ -374,7 +374,7 @@ int garbage_collect_leafentry(LEAFENTRY old_leaf_entry, LEAFENTRY *new_leaf_entry, size_t *new_leaf_entry_memory_size, - OMT omt, + OMT *omtp, struct mempool *mp, void **maybe_free, const xid_omt_t &snapshot_xids, @@ -387,7 +387,7 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry, r = le_pack(&ule, new_leaf_entry_memory_size, new_leaf_entry, - omt, + omtp, mp, maybe_free); assert(r == 0); @@ -713,8 +713,8 @@ int le_pack(ULE ule, // data to be packed into new leafentry size_t *new_leafentry_memorysize, LEAFENTRY * const new_leafentry_p, // this is what this function creates - OMT omt, - struct mempool *mp, + OMT *omtp, + struct mempool *mp, void **maybe_free) { invariant(ule->num_cuxrs > 0); @@ -740,7 +740,7 @@ le_pack(ULE ule, // data to be packed into new leafen found_insert:; memsize = le_memsize_from_ule(ule); LEAFENTRY new_leafentry; - CAST_FROM_VOIDP(new_leafentry, le_malloc(omt, mp, memsize, maybe_free)); + CAST_FROM_VOIDP(new_leafentry, le_malloc(omtp, mp, memsize, maybe_free)); //Universal data new_leafentry->keylen = toku_htod32(ule->keylen); @@ -2293,7 +2293,7 @@ int toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, size_t *new_leafentry_memorysize, LEAFENTRY *new_leafentry_p, - OMT omt, + OMT *omtp, struct mempool *mp) { ULE_S ule; int rval; @@ -2305,7 +2305,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, rval = le_pack(&ule, // create packed leafentry new_leafentry_memorysize, new_leafentry_p, - omt, mp, NULL); + omtp, mp, NULL); ule_cleanup(&ule); return rval; } @@ -53,18 +53,18 @@ void fast_msg_to_leafentry( LEAFENTRY *new_leafentry_p) ; int apply_msg_to_leafentry(FT_MSG msg, - LEAFENTRY old_leafentry, // NULL if there was no stored data. - size_t *new_leafentry_memorysize, - LEAFENTRY *new_leafentry_p, - OMT omt, - struct mempool *mp, - void **maybe_free, + LEAFENTRY old_leafentry, // NULL if there was no stored data. + size_t *new_leafentry_memorysize, + LEAFENTRY *new_leafentry_p, + OMT *omtp, + struct mempool *mp, + void **maybe_free, int64_t * numbytes_delta_p); int garbage_collect_leafentry(LEAFENTRY old_leaf_entry, LEAFENTRY *new_leaf_entry, size_t *new_leaf_entry_memory_size, - OMT omt, + OMT *omtp, struct mempool *mp, void **maybe_free, const xid_omt_t &snapshot_xids, diff --git a/ft/worker-thread-benchmarks/threadpool.cc b/ft/worker-thread-benchmarks/threadpool.cc index 0ac6ffb5b2c..c9e4d9bcb35 100644 --- a/ft/worker-thread-benchmarks/threadpool.cc +++ b/ft/worker-thread-benchmarks/threadpool.cc @@ -11,6 +11,7 @@ #include <errno.h> #include "threadpool.h" +#include <portability/toku_atomic.h> // use gcc builtin fetch_and_add 0->no 1->yes #define DO_ATOMIC_FETCH_AND_ADD 0 @@ -61,7 +62,7 @@ void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) void threadpool_set_thread_busy(THREADPOOL threadpool) { #if DO_ATOMIC_FETCH_AND_ADD - (void) __sync_fetch_and_add(&threadpool->busy_threads, 1); + (void) toku_sync_fetch_and_add(&threadpool->busy_threads, 1); #else threadpool->busy_threads++; #endif @@ -69,7 +70,7 @@ void threadpool_set_thread_busy(THREADPOOL threadpool) { void threadpool_set_thread_idle(THREADPOOL threadpool) { #if DO_ATOMIC_FETCH_AND_ADD - (void) __sync_fetch_and_add(&threadpool->busy_threads, -1); + (void) toku_sync_fetch_and_add(&threadpool->busy_threads, -1); #else threadpool->busy_threads--; #endif |