diff options
author | Luke Chen <luke.chen@mongodb.com> | 2022-09-09 16:04:59 +1000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-09 06:34:14 +0000 |
commit | ae9ebae348d3f87e6a69ebff90aa24792a9dd365 (patch) | |
tree | d8b91d8c870d25793373f977b55d4c838bfc811f | |
parent | ccc7ca852ec042b2ee948812d4980ee6197375b5 (diff) | |
download | mongo-ae9ebae348d3f87e6a69ebff90aa24792a9dd365.tar.gz |
Import wiredtiger: c004346e10b186c1ecea4a1ee3186619666c9c17 from branch mongodb-6.1
ref: bb64c7cdca..c004346e10
for: 6.1.0-rc2
WT-7833 wt_btree_switch_object needs to coordinate with concurrent write requests to btree (#8044)
16 files changed, 245 insertions, 226 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 9a2ab6174df..543dc47d308 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-6.1", - "commit": "bb64c7cdcac3638b66ecea5c240df1448d409ab8" + "commit": "c004346e10b186c1ecea4a1ee3186619666c9c17" } diff --git a/src/third_party/wiredtiger/src/block/block_ckpt.c b/src/third_party/wiredtiger/src/block/block_ckpt.c index ec62e5ce3f5..90452b1245e 100644 --- a/src/third_party/wiredtiger/src/block/block_ckpt.c +++ b/src/third_party/wiredtiger/src/block/block_ckpt.c @@ -42,6 +42,7 @@ __wt_block_checkpoint_load(WT_SESSION_IMPL *session, WT_BLOCK *block, const uint WT_BLOCK_CKPT *ci, _ci; WT_DECL_RET; uint8_t *endp; + bool live_open; /* * Sometimes we don't find a root page (we weren't given a checkpoint, or the checkpoint was @@ -63,16 +64,17 @@ __wt_block_checkpoint_load(WT_SESSION_IMPL *session, WT_BLOCK *block, const uint ci = &_ci; WT_ERR(__wt_block_ckpt_init(session, ci, "checkpoint")); } else { -#ifdef HAVE_DIAGNOSTIC /* * We depend on the btree level for locking: things will go bad fast if we open the live * system in two handles, or salvage, truncate or verify the live/running file. */ __wt_spin_lock(session, &block->live_lock); - WT_ASSERT(session, block->live_open == false); + live_open = block->live_open; block->live_open = true; __wt_spin_unlock(session, &block->live_lock); -#endif + WT_ERR_ASSERT( + session, live_open == false, EBUSY, "%s: attempt to re-open live file", block->name); + ci = &block->live; WT_ERR(__wt_block_ckpt_init(session, ci, "live")); } @@ -108,12 +110,12 @@ __wt_block_checkpoint_load(WT_SESSION_IMPL *session, WT_BLOCK *block, const uint } /* - * If the checkpoint can be written, that means anything written after the checkpoint is no - * longer interesting, truncate the file. Don't bother checking the avail list for a block at - * the end of the file, that was done when the checkpoint was first written (re-writing the - * checkpoint might possibly make it relevant here, but it's unlikely enough I don't bother). + * If the object can be written, that means anything written after the checkpoint is no longer + * interesting, truncate the file. Don't bother checking the avail list for a block at the end + * of the file, that was done when the checkpoint was first written (re-writing the checkpoint + * might possibly make it relevant here, but it's unlikely enough I don't bother). */ - if (!checkpoint && WT_BLOCK_ISLOCAL(block)) + if (!checkpoint) WT_ERR(__wt_block_truncate(session, block, ci->file_size)); if (0) { @@ -263,13 +265,16 @@ err: /* * __ckpt_extlist_read -- - * Read a checkpoints extent lists and copy + * Read a checkpoint's extent lists. */ static int -__ckpt_extlist_read(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckpt) +__ckpt_extlist_read(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckpt, bool *localp) { WT_BLOCK_CKPT *ci; + /* Default to a local file. */ + *localp = true; + /* * Allocate a checkpoint structure, crack the cookie and read the checkpoint's extent lists. * @@ -284,6 +289,13 @@ __ckpt_extlist_read(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckpt) ci = ckpt->bpriv; WT_RET(__wt_block_ckpt_init(session, ci, ckpt->name)); WT_RET(__wt_block_ckpt_unpack(session, block, ckpt->raw.data, ckpt->raw.size, ci)); + + /* Extent lists from non-local objects aren't useful, we're going to skip them. */ + if (ci->root_objectid != block->objectid) { + *localp = false; + return (0); + } + WT_RET(__wt_block_extlist_read(session, block, &ci->alloc, ci->file_size)); WT_RET(__wt_block_extlist_read(session, block, &ci->discard, ci->file_size)); @@ -473,7 +485,7 @@ __ckpt_process(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckptbase) WT_CKPT *ckpt, *next_ckpt; WT_DECL_RET; uint64_t ckpt_size; - bool deleting, fatal, locked; + bool deleting, fatal, local, locked; ci = &block->live; fatal = locked = false; @@ -541,21 +553,29 @@ __ckpt_process(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckptbase) __wt_block_extlist_free(session, &ci->ckpt_discard); /* - * To delete a checkpoint, we'll need checkpoint information for it and the subsequent - * checkpoint into which it gets rolled; read them from disk before we lock things down. + * To delete a checkpoint, we need checkpoint information for it and the subsequent checkpoint + * into which it gets rolled; read them from disk before we lock things down. */ deleting = false; WT_CKPT_FOREACH (ckptbase, ckpt) { if (F_ISSET(ckpt, WT_CKPT_FAKE) || !F_ISSET(ckpt, WT_CKPT_DELETE)) continue; - deleting = true; /* * Read the checkpoint and next checkpoint extent lists if we haven't already read them (we * may have already read these extent blocks if there is more than one deleted checkpoint). + * + * We can only delete checkpoints in the current file. Checkpoints of tiered storage objects + * are checkpoints for the logical object, including files that are no longer live. Skip any + * checkpoints that aren't local to the live object. */ - if (ckpt->bpriv == NULL) - WT_ERR(__ckpt_extlist_read(session, block, ckpt)); + if (ckpt->bpriv == NULL) { + WT_ERR(__ckpt_extlist_read(session, block, ckpt, &local)); + if (!local) + continue; + } + + deleting = true; for (next_ckpt = ckpt + 1;; ++next_ckpt) if (!F_ISSET(next_ckpt, WT_CKPT_FAKE)) @@ -564,8 +584,11 @@ __ckpt_process(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckptbase) /* * The "next" checkpoint may be the live tree which has no extent blocks to read. */ - if (next_ckpt->bpriv == NULL && !F_ISSET(next_ckpt, WT_CKPT_ADD)) - WT_ERR(__ckpt_extlist_read(session, block, next_ckpt)); + if (next_ckpt->bpriv == NULL && !F_ISSET(next_ckpt, WT_CKPT_ADD)) { + WT_ERR(__ckpt_extlist_read(session, block, next_ckpt, &local)); + WT_ERR_ASSERT(session, local == true, WT_PANIC, + "tiered storage checkpoint follows local checkpoint"); + } } /* @@ -610,8 +633,7 @@ __ckpt_process(WT_SESSION_IMPL *session, WT_BLOCK *block, WT_CKPT *ckptbase) * lists, and the freed blocks will then be included when writing the live extent lists. */ WT_CKPT_FOREACH (ckptbase, ckpt) { - if (F_ISSET(ckpt, WT_CKPT_FAKE) || !F_ISSET(ckpt, WT_CKPT_DELETE) || - !WT_BLOCK_ISLOCAL(block)) + if (F_ISSET(ckpt, WT_CKPT_FAKE) || !F_ISSET(ckpt, WT_CKPT_DELETE)) continue; if (WT_VERBOSE_ISSET(session, WT_VERB_CHECKPOINT)) diff --git a/src/third_party/wiredtiger/src/block/block_ext.c b/src/third_party/wiredtiger/src/block/block_ext.c index 3406f8526b2..a4451ea00e0 100644 --- a/src/third_party/wiredtiger/src/block/block_ext.c +++ b/src/third_party/wiredtiger/src/block/block_ext.c @@ -571,7 +571,15 @@ __wt_block_free(WT_SESSION_IMPL *session, WT_BLOCK *block, const uint8_t *addr, WT_RET(__wt_block_addr_unpack( session, block, addr, addr_size, &objectid, &offset, &size, &checksum)); - /* We can't reuse free space in an object. */ + /* + * Freeing blocks in a previous object isn't possible in the current architecture. We'd like to + * know when a previous object is either completely rewritten (or more likely, empty enough that + * rewriting remaining blocks is worth doing). Just knowing which blocks are no longer in use + * isn't enough to remove them (because the internal pages have to be rewritten and we don't + * know where they are); the simplest solution is probably to keep a count of freed bytes from + * each object in the metadata, and when enough of the object is no longer in use, perform a + * compaction like process to do any remaining cleanup. + */ if (objectid != block->objectid) return (0); diff --git a/src/third_party/wiredtiger/src/block/block_open.c b/src/third_party/wiredtiger/src/block/block_open.c index 3fc9e4d07df..37a771515cb 100644 --- a/src/third_party/wiredtiger/src/block/block_open.c +++ b/src/third_party/wiredtiger/src/block/block_open.c @@ -132,6 +132,7 @@ __wt_block_close(WT_SESSION_IMPL *session, WT_BLOCK *block) } __wt_free(session, block->name); + __wt_spin_destroy(session, &block->cache_lock); __wt_free(session, block->related); WT_TRET(__wt_close(session, &block->fh)); @@ -205,6 +206,9 @@ __wt_block_open(WT_SESSION_IMPL *session, const char *filename, uint32_t objecti WT_CONN_BLOCK_INSERT(conn, block, bucket); block->linked = true; + /* Initialize the block cache layer lock. */ + WT_ERR(__wt_spin_init(session, &block->cache_lock, "block cache")); + /* If not passed an allocation size, get one from the configuration. */ if (allocsize == 0) { WT_ERR(__wt_config_gets(session, cfg, "allocation_size", &cval)); diff --git a/src/third_party/wiredtiger/src/block_cache/block_mgr.c b/src/third_party/wiredtiger/src/block_cache/block_mgr.c index 3e7df914f4d..ca2cee442cf 100644 --- a/src/third_party/wiredtiger/src/block_cache/block_mgr.c +++ b/src/third_party/wiredtiger/src/block_cache/block_mgr.c @@ -11,6 +11,69 @@ static void __bm_method_set(WT_BM *, bool); /* + * __bm_close_block_remove -- + * Remove a single block handle. Must be called with the block lock held. + */ +static int +__bm_close_block_remove(WT_SESSION_IMPL *session, WT_BLOCK *block) +{ + u_int i; + + /* Discard any references we're holding. */ + for (i = 0; i < block->related_next; ++i) { + --block->related[i]->ref; + block->related[i] = NULL; + } + + /* Discard the block structure. */ + return (__wt_block_close(session, block)); +} + +/* + * __bm_close_block -- + * Close a single block handle, removing the handle if it's no longer useful. + */ +static int +__bm_close_block(WT_SESSION_IMPL *session, WT_BLOCK *block) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + bool found; + + conn = S2C(session); + + __wt_verbose(session, WT_VERB_BLKCACHE, "close: %s", block->name); + + __wt_spin_lock(session, &conn->block_lock); + if (block->ref > 0 && --block->ref > 0) { + __wt_spin_unlock(session, &conn->block_lock); + return (0); + } + + /* You can't close files during a checkpoint. */ + WT_ASSERT( + session, block->ckpt_state == WT_CKPT_NONE || block->ckpt_state == WT_CKPT_PANIC_ON_FAILURE); + + /* + * Every time we remove a block, we may have sufficiently decremented other references to allow + * other blocks to be removed. It's unlikely for blocks to reference each other but it's not out + * of the question, either. Loop until we don't find anything to close. + */ + do { + found = false; + TAILQ_FOREACH (block, &conn->blockqh, q) + if (block->ref == 0) { + found = true; + WT_TRET(__bm_close_block_remove(session, block)); + break; + } + } while (found); + __wt_spin_unlock(session, &conn->block_lock); + + return (ret); +} + +/* * __bm_readonly -- * General-purpose "writes not supported on this handle" function. */ @@ -60,7 +123,40 @@ static int __bm_checkpoint( WT_BM *bm, WT_SESSION_IMPL *session, WT_ITEM *buf, WT_CKPT *ckptbase, bool data_checksum) { - return (__wt_block_checkpoint(session, bm->block, buf, ckptbase, data_checksum)); + WT_BLOCK *block, *tblock; + WT_CONNECTION_IMPL *conn; + bool found; + + conn = S2C(session); + block = bm->block; + + WT_RET(__wt_block_checkpoint(session, block, buf, ckptbase, data_checksum)); + + /* + * Close previous primary objects that are no longer being written, that is, ones where all + * in-flight writes have drained. We know all writes have drained when a subsequent checkpoint + * completes, and we know the metadata file is the last file to be checkpointed. After + * checkpointing the metadata file, review any previous primary objects, flushing writes and + * discarding the primary reference. + */ + if (strcmp(WT_METAFILE, block->name) != 0) + return (0); + do { + found = false; + __wt_spin_lock(session, &conn->block_lock); + TAILQ_FOREACH (tblock, &conn->blockqh, q) + if (tblock->close_on_checkpoint) { + tblock->close_on_checkpoint = false; + __wt_spin_unlock(session, &conn->block_lock); + found = true; + WT_RET(__wt_fsync(session, tblock->fh, true)); + WT_RET(__bm_close_block(session, tblock)); + break; + } + } while (found); + __wt_spin_unlock(session, &conn->block_lock); + + return (0); } /* @@ -183,69 +279,6 @@ __bm_checkpoint_unload(WT_BM *bm, WT_SESSION_IMPL *session) } /* - * __bm_close_block_remove -- - * Remove a single block handle. - */ -static int -__bm_close_block_remove(WT_SESSION_IMPL *session, WT_BLOCK *block) -{ - u_int i; - - /* Discard any references we're holding. */ - for (i = 0; i < block->related_next; ++i) { - --block->related[i]->ref; - block->related[i] = NULL; - } - - /* Discard the block structure. */ - return (__wt_block_close(session, block)); -} - -/* - * __bm_close_block -- - * Close a single block handle, removing the handle if it's no longer useful. - */ -static int -__bm_close_block(WT_SESSION_IMPL *session, WT_BLOCK *block) -{ - WT_CONNECTION_IMPL *conn; - WT_DECL_RET; - bool found; - - __wt_verbose(session, WT_VERB_BLKCACHE, "close: %s", block->name); - - conn = S2C(session); - - /* You can't close files during a checkpoint. */ - WT_ASSERT( - session, block->ckpt_state == WT_CKPT_NONE || block->ckpt_state == WT_CKPT_PANIC_ON_FAILURE); - - __wt_spin_lock(session, &conn->block_lock); - if (block->ref > 0 && --block->ref > 0) { - __wt_spin_unlock(session, &conn->block_lock); - return (0); - } - - /* - * Every time we remove a block, we may have sufficiently decremented other references to allow - * other blocks to be removed. It's unlikely for blocks to reference each other but it's not out - * of the question, either. Loop until we don't find anything to close. - */ - do { - found = false; - TAILQ_FOREACH (block, &conn->blockqh, q) - if (block->ref == 0) { - found = true; - WT_TRET(__bm_close_block_remove(session, block)); - break; - } - } while (found); - __wt_spin_unlock(session, &conn->block_lock); - - return (ret); -} - -/* * __bm_close -- * Close a file. */ @@ -562,28 +595,49 @@ __bm_stat(WT_BM *bm, WT_SESSION_IMPL *session, WT_DSRC_STATS *stats) static int __bm_switch_object(WT_BM *bm, WT_SESSION_IMPL *session, uint32_t objectid) { - WT_BLOCK *block; + WT_BLOCK *block, *current; + size_t root_addr_size; - block = bm->block; + current = bm->block; + + /* There should not be a checkpoint in progress. */ + WT_ASSERT(session, !S2C(session)->txn_global.checkpoint_running); + + WT_RET(__wt_blkcache_tiered_open(session, NULL, objectid, &block)); + + __wt_verbose( + session, WT_VERB_TIERED, "block manager switching from %s to %s", current->name, block->name); - /* Close out our current handle. */ - WT_RET(__bm_close_block(session, block)); - bm->block = NULL; + /* Fast-path switching to the current object, just undo the reference count increment. */ + if (block == current) + return (__bm_close_block(session, block)); - WT_RET(__wt_blkcache_get_handle(session, NULL, objectid, &block)); + /* Load a new object. */ + WT_RET(__wt_block_checkpoint_load(session, block, NULL, 0, NULL, &root_addr_size, false)); /* - * KEITH XXX: We need to distinguish between tiered switch and loading a checkpoint. This is - * also discarding the extent list which isn't correct, because we can't know when to discard - * previous files if we don't have the extent list. This fixes the problem where we randomly - * write a new position in the new tiered object, but it's not OK. + * The previous object should be closed once writes have drained. + * + * FIXME: the old object does not participate in the upcoming checkpoint which has a couple of + * implications. First, the extent lists for the old object are discarded and never written, + * which makes it impossible to treat the old object as a standalone object, so, for example, + * you can't verify it. A solution to this is for the upper layers to checkpoint all modified + * objects in the logical object before the checkpoint updates the metadata, flushing all + * underlying writes to stable storage, but that means writing extent lists without a root page. */ - WT_RET(__wt_block_ckpt_init(session, &block->live, "live")); + current->close_on_checkpoint = true; /* - * This isn't right: the new block handle will reasonably have different methods for objects in - * different backing sources. That's not the case today, but the current architecture lacks the - * ability to support multiple sources cleanly. + * Swap out the block manager's default handler. + * + * FIXME: the new block handle reasonably has different methods for objects in different backing + * sources. That's not the case today, but the current architecture lacks the ability to support + * multiple sources cleanly. + * + * FIXME: it should not be possible for a thread of control to copy the WT_BM value in the btree + * layer, sleep until after a subsequent switch and a subsequent a checkpoint that would discard + * the WT_BM it copied, but it would be worth thinking through those scenarios in detail to be + * sure there aren't any races. */ bm->block = block; return (0); @@ -783,7 +837,7 @@ __wt_blkcache_open(WT_SESSION_IMPL *session, const char *uri, const char *cfg[], *bmp = NULL; - __wt_verbose(session, WT_VERB_BLOCK, "open: %s", uri); + __wt_verbose(session, WT_VERB_BLKCACHE, "open: %s", uri); WT_RET(__wt_calloc_one(session, &bm)); __bm_method_set(bm, false); diff --git a/src/third_party/wiredtiger/src/block_cache/block_tier.c b/src/third_party/wiredtiger/src/block_cache/block_tier.c index e96ac9b5787..deca87d0b0b 100644 --- a/src/third_party/wiredtiger/src/block_cache/block_tier.c +++ b/src/third_party/wiredtiger/src/block_cache/block_tier.c @@ -97,41 +97,54 @@ err: /* * __wt_blkcache_get_handle -- - * Get a block handle for an object, creating it if it doesn't exist, optionally cache a - * reference. + * Get a cached block handle for an object, creating it if it doesn't exist. */ int __wt_blkcache_get_handle( - WT_SESSION_IMPL *session, WT_BLOCK *orig, uint32_t objectid, WT_BLOCK **blockp) + WT_SESSION_IMPL *session, WT_BLOCK *current, uint32_t objectid, WT_BLOCK **blockp) { + WT_DECL_RET; u_int i; *blockp = NULL; /* We should never be looking for our own object. */ - WT_ASSERT(session, orig == NULL || orig->objectid != objectid); + WT_ASSERT(session, current->objectid != objectid); /* * Check the local cache for the object. We don't have to check the name because we can only * reference objects in our name space. */ - if (orig != NULL) { - for (i = 0; i < orig->related_next; ++i) - if (orig->related[i]->objectid == objectid) { - *blockp = orig->related[i]; - return (0); - } - + for (i = 0; i < current->related_next; ++i) + if (current->related[i]->objectid == objectid) { + *blockp = current->related[i]; + return (0); + } + + /* Lock the block cache layer. */ + __wt_spin_lock(session, ¤t->cache_lock); + + /* Check to make sure the object wasn't cached while we locked. */ + for (i = 0; i < current->related_next; ++i) + if (current->related[i]->objectid == objectid) { + *blockp = current->related[i]; + break; + } + + /* Open the object. */ + if (*blockp == NULL) { /* Allocate space to store a reference (do first for less complicated cleanup). */ - WT_RET(__wt_realloc_def( - session, &orig->related_allocated, orig->related_next + 1, &orig->related)); - } + WT_ERR(__wt_realloc_def( + session, ¤t->related_allocated, current->related_next + 1, ¤t->related)); - /* Get a reference to the object, opening it as necessary. */ - WT_RET(__wt_blkcache_tiered_open(session, NULL, objectid, blockp)); + /* Get a reference to the object, opening it as necessary. */ + WT_ERR(__wt_blkcache_tiered_open(session, NULL, objectid, blockp)); - /* Save a reference in the block in which we started for fast subsequent access. */ - if (orig != NULL) - orig->related[orig->related_next++] = *blockp; - return (0); + /* Save a reference in the block in which we started for fast subsequent access. */ + current->related[current->related_next++] = *blockp; + } + +err: + __wt_spin_unlock(session, ¤t->cache_lock); + return (ret); } diff --git a/src/third_party/wiredtiger/src/include/block.h b/src/third_party/wiredtiger/src/include/block.h index f0aed0a3aac..dd6af8235ea 100644 --- a/src/third_party/wiredtiger/src/include/block.h +++ b/src/third_party/wiredtiger/src/include/block.h @@ -16,8 +16,6 @@ */ #define WT_BLOCK_INVALID_OFFSET 0 -#define WT_BLOCK_ISLOCAL(block) ((block)->objectid == WT_TIERED_OBJECTID_NONE) - /* * The block manager maintains three per-checkpoint extent lists: * alloc: the extents allocated in this checkpoint @@ -232,6 +230,7 @@ struct __wt_block { TAILQ_ENTRY(__wt_block) hashq; /* Hashed list of handles */ bool linked; + WT_SPINLOCK cache_lock; /* Block cache layer lock */ WT_BLOCK **related; /* Related objects */ size_t related_allocated; /* Size of related object array */ u_int related_next; /* Next open slot */ @@ -241,6 +240,7 @@ struct __wt_block { wt_off_t extend_size; /* File extended size */ wt_off_t extend_len; /* File extend chunk size */ + bool close_on_checkpoint; /* Close the handle after the next checkpoint */ bool created_during_backup; /* Created during incremental backup */ /* Configuration information, set when the file is opened. */ @@ -258,11 +258,8 @@ struct __wt_block { */ WT_SPINLOCK live_lock; /* Live checkpoint lock */ WT_BLOCK_CKPT live; /* Live checkpoint */ -#ifdef HAVE_DIAGNOSTIC - bool live_open; /* Live system is open */ -#endif - /* Live checkpoint status */ - enum { + bool live_open; /* Live system is open */ + enum { /* Live checkpoint status */ WT_CKPT_NONE = 0, WT_CKPT_INPROGRESS, WT_CKPT_PANIC_ON_FAILURE, diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 987df757b05..01b4b9e2512 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -84,7 +84,7 @@ extern int __wt_backup_open(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_bad_object_type(WT_SESSION_IMPL *session, const char *uri) WT_GCC_FUNC_DECL_ATTRIBUTE((cold)) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_blkcache_get_handle(WT_SESSION_IMPL *session, WT_BLOCK *orig, uint32_t objectid, +extern int __wt_blkcache_get_handle(WT_SESSION_IMPL *session, WT_BLOCK *current, uint32_t objectid, WT_BLOCK **blockp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_blkcache_map(WT_SESSION_IMPL *session, WT_BLOCK *block, void **mapped_regionp, size_t *lengthp, void **mapped_cookiep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); diff --git a/src/third_party/wiredtiger/test/checkpoint/checkpointer.c b/src/third_party/wiredtiger/test/checkpoint/checkpointer.c index 1e1e924659a..4a19724a148 100644 --- a/src/third_party/wiredtiger/test/checkpoint/checkpointer.c +++ b/src/third_party/wiredtiger/test/checkpoint/checkpointer.c @@ -61,10 +61,8 @@ start_threads(void) { set_stable(); testutil_check(__wt_thread_create(NULL, &g.checkpoint_thread, checkpointer, NULL)); - if (g.tiered) { - testutil_check(__wt_rwlock_init(NULL, &g.flush_lock)); + if (g.tiered) testutil_check(__wt_thread_create(NULL, &g.flush_thread, flush_thread, NULL)); - } if (g.use_timestamps) { testutil_check(__wt_rwlock_init(NULL, &g.clock_lock)); testutil_check(__wt_thread_create(NULL, &g.clock_thread, clock_thread, NULL)); @@ -78,14 +76,9 @@ start_threads(void) void end_threads(void) { - if (g.tiered) { - /* - * The flush lock is also used by the worker threads. They have exited by the time this is - * called, so it is safe to destroy it. - */ + if (g.tiered) testutil_check(__wt_thread_join(NULL, &g.flush_thread)); - __wt_rwlock_destroy(NULL, &g.flush_lock); - } + /* Shutdown checkpoint after flush thread completes because flush depends on checkpoint. */ testutil_check(__wt_thread_join(NULL, &g.checkpoint_thread)); @@ -157,7 +150,6 @@ flush_thread(void *arg) { WT_RAND_STATE rnd; WT_SESSION *wt_session; - WT_SESSION_IMPL *session; uint64_t delay; char tid[128]; @@ -165,17 +157,13 @@ flush_thread(void *arg) __wt_random_init(&rnd); testutil_check(g.conn->open_session(g.conn, NULL, NULL, &wt_session)); - session = (WT_SESSION_IMPL *)wt_session; testutil_check(__wt_thread_str(tid, sizeof(tid))); printf("flush thread starting: tid: %s\n", tid); fflush(stdout); while (g.running) { - /* FIXME-WT-7833 Remove this lock when that ticket merges. */ - __wt_writelock(session, &g.flush_lock); testutil_check(wt_session->flush_tier(wt_session, NULL)); - __wt_writeunlock(session, &g.flush_lock); printf("Finished a flush_tier\n"); fflush(stdout); diff --git a/src/third_party/wiredtiger/test/checkpoint/test_checkpoint.h b/src/third_party/wiredtiger/test/checkpoint/test_checkpoint.h index e8d4b6ed925..42748f7c597 100644 --- a/src/third_party/wiredtiger/test/checkpoint/test_checkpoint.h +++ b/src/third_party/wiredtiger/test/checkpoint/test_checkpoint.h @@ -88,7 +88,6 @@ typedef struct { COOKIE *cookies; /* Per-thread info */ WT_RWLOCK clock_lock; /* Clock synchronization */ - WT_RWLOCK flush_lock; /* Flush synchronization */ wt_thread_t checkpoint_thread; /* Checkpoint thread */ wt_thread_t clock_thread; /* Clock thread */ wt_thread_t flush_thread; /* Flush thread */ diff --git a/src/third_party/wiredtiger/test/checkpoint/workers.c b/src/third_party/wiredtiger/test/checkpoint/workers.c index cfc594a483d..59c09e4c238 100644 --- a/src/third_party/wiredtiger/test/checkpoint/workers.c +++ b/src/third_party/wiredtiger/test/checkpoint/workers.c @@ -401,8 +401,6 @@ real_worker(void) } new_txn = true; start_txn = false; - if (g.tiered) - __wt_readlock((WT_SESSION_IMPL *)session, &g.flush_lock); } keyno = __wt_random(&rnd) % g.nkeys + 1; /* If we have specified to run with mix mode deletes we need to do it in it's own txn. */ @@ -428,8 +426,6 @@ real_worker(void) } } start_txn = true; - if (g.tiered) - __wt_readunlock((WT_SESSION_IMPL *)session, &g.flush_lock); continue; } else new_txn = false; @@ -494,8 +490,6 @@ real_worker(void) } } start_txn = true; - if (g.tiered) - __wt_readunlock((WT_SESSION_IMPL *)session, &g.flush_lock); } } else if (next_rnd % 15 == 0) /* Occasionally reopen cursors during a running transaction. */ @@ -506,8 +500,6 @@ real_worker(void) goto err; } start_txn = true; - if (g.tiered) - __wt_readunlock((WT_SESSION_IMPL *)session, &g.flush_lock); } if (reopen_cursors) { for (j = 0; j < g.ntables; j++) { diff --git a/src/third_party/wiredtiger/test/csuite/random_directio/main.c b/src/third_party/wiredtiger/test/csuite/random_directio/main.c index d05a5227cdf..0bbca8ba145 100644 --- a/src/third_party/wiredtiger/test/csuite/random_directio/main.c +++ b/src/third_party/wiredtiger/test/csuite/random_directio/main.c @@ -164,12 +164,6 @@ static const char *const uri_rev = "table:rev"; #define SCHEMA_FREQUENCY_DEFAULT 100 static uint64_t schema_frequency; -/* - * TODO: WT-7833 Lock to coordinate inserts and flush_tier. This lock should be removed when that - * ticket is fixed. Flush_tier should be able to run with ongoing operations. - */ -static pthread_rwlock_t flush_lock; - #define TEST_STREQ(expect, got, message) \ do { \ if (!WT_STREQ(expect, got)) { \ @@ -395,11 +389,9 @@ schema_operation(WT_SESSION *session, uint32_t threadid, uint64_t id, uint32_t o testutil_check(session->open_cursor(session, uri1, NULL, NULL, &cursor)); cursor->set_key(cursor, uri1); cursor->set_value(cursor, uri1); - testutil_check(pthread_rwlock_rdlock(&flush_lock)); testutil_check(session->log_printf(session, "INSERT: %s", uri1)); testutil_check(cursor->insert(cursor)); testutil_check(session->log_printf(session, "INSERT: DONE %s", uri1)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); testutil_check(cursor->close(cursor)); break; case 2: @@ -426,11 +418,9 @@ schema_operation(WT_SESSION *session, uint32_t threadid, uint64_t id, uint32_t o /* fprintf(stderr, "UPDATE: %s\n", uri2); */ - testutil_check(pthread_rwlock_rdlock(&flush_lock)); testutil_check(session->log_printf(session, "UPDATE: %s", uri2)); testutil_check(cursor->update(cursor)); testutil_check(session->log_printf(session, "UPDATE: DONE %s", uri2)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); testutil_check(cursor->close(cursor)); break; case 4: @@ -514,9 +504,7 @@ thread_flush_run(void *arg) * Currently not testing any of the flush tier configuration strings other than defaults. We * expect the defaults are what MongoDB wants for now. */ - testutil_check(pthread_rwlock_wrlock(&flush_lock)); testutil_check(session->flush_tier(session, NULL)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); printf("Flush tier %" PRIu32 " completed.\n", i); fflush(stdout); } @@ -576,7 +564,6 @@ again: gen_kv(buf1, kvsize, i, td->id, large, true); gen_kv(buf2, kvsize, i, td->id, large, false); - testutil_check(pthread_rwlock_rdlock(&flush_lock)); testutil_check(session->begin_transaction(session, NULL)); cursor->set_key(cursor, buf1); /* @@ -602,10 +589,8 @@ again: * operations are not part of the transaction operations for the main table. If we are * running 'integrated' then we'll first do the schema operations and commit later. */ - if (!F_ISSET(td, SCHEMA_INTEGRATED)) { + if (!F_ISSET(td, SCHEMA_INTEGRATED)) testutil_check(session->commit_transaction(session, NULL)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); - } /* * If we are doing a schema test, generate operations for additional tables. Each table has * a 'lifetime' of 4 values of the id. @@ -625,10 +610,8 @@ again: /* * Only rollback if integrated and we have an active transaction. */ - if (F_ISSET(td, SCHEMA_INTEGRATED)) { + if (F_ISSET(td, SCHEMA_INTEGRATED)) testutil_check(session->rollback_transaction(session, NULL)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); - } sleep(1); goto again; } @@ -636,10 +619,8 @@ again: /* * If schema operations are integrated, commit the transaction now that they're complete. */ - if (F_ISSET(td, SCHEMA_INTEGRATED)) { + if (F_ISSET(td, SCHEMA_INTEGRATED)) testutil_check(session->commit_transaction(session, NULL)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); - } } /* NOTREACHED */ } @@ -1304,7 +1285,6 @@ main(int argc, char *argv[]) if (LF_ISSET(TEST_TIERED) && !LF_ISSET(TEST_CKPT)) usage(); - testutil_check(pthread_rwlock_init(&flush_lock, NULL)); testutil_work_dir_from_path(home, sizeof(home), working_dir); /* * If the user wants to verify they need to tell us how many threads there were so we know what diff --git a/src/third_party/wiredtiger/test/csuite/schema_abort/main.c b/src/third_party/wiredtiger/test/csuite/schema_abort/main.c index e13c979faac..3cdb386ea0e 100644 --- a/src/third_party/wiredtiger/test/csuite/schema_abort/main.c +++ b/src/third_party/wiredtiger/test/csuite/schema_abort/main.c @@ -86,12 +86,6 @@ typedef struct { } THREAD_TS; static volatile THREAD_TS th_ts[MAX_TH]; -/* - * TODO: WT-7833 Lock to coordinate inserts and flush_tier. This lock should be removed when that - * ticket is fixed. Flush_tier should be able to run with ongoing operations. - */ -static pthread_rwlock_t flush_lock; - #define ENV_CONFIG_COMPAT ",compatibility=(release=\"2.9\")" #define ENV_CONFIG_DEF \ "create," \ @@ -625,13 +619,11 @@ thread_flush_run(void *arg) * Currently not testing any of the flush tier configuration strings other than defaults. We * expect the defaults are what MongoDB wants for now. */ - testutil_check(pthread_rwlock_wrlock(&flush_lock)); if ((ret = session->flush_tier(session, NULL)) != 0) { if (ret != EBUSY) testutil_die(ret, "session.flush_tier"); } else printf("Flush tier %" PRIu32 " completed.\n", i); - testutil_check(pthread_rwlock_unlock(&flush_lock)); fflush(stdout); } /* NOTREACHED */ @@ -751,7 +743,6 @@ thread_run(void *arg) if (use_ts) stable_ts = __wt_atomic_addv64(&global_ts, 1); - testutil_check(pthread_rwlock_rdlock(&flush_lock)); testutil_check(session->begin_transaction(session, NULL)); if (use_prep) testutil_check(oplog_session->begin_transaction(oplog_session, NULL)); @@ -828,7 +819,6 @@ thread_run(void *arg) data.data = lbuf; cur_local->set_value(cur_local, &data); testutil_check(cur_local->insert(cur_local)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); /* * Save the timestamp and key separately for checking later. @@ -1066,7 +1056,6 @@ main(int argc, char *argv[]) if (argc != 0) usage(); - testutil_check(pthread_rwlock_init(&flush_lock, NULL)); testutil_work_dir_from_path(home, sizeof(home), working_dir); /* * If the user wants to verify they need to tell us how many threads there were so we can find diff --git a/src/third_party/wiredtiger/test/csuite/tiered_abort/main.c b/src/third_party/wiredtiger/test/csuite/tiered_abort/main.c index da22bf30bd6..3f8e4ffb14b 100644 --- a/src/third_party/wiredtiger/test/csuite/tiered_abort/main.c +++ b/src/third_party/wiredtiger/test/csuite/tiered_abort/main.c @@ -131,11 +131,6 @@ typedef struct { uint32_t info; } THREAD_DATA; -/* - * TODO: WT-7833 Lock to coordinate inserts and flush_tier. This lock should be removed when that - * ticket is fixed. Flush_tier should be able to run with ongoing operations. - */ -static pthread_rwlock_t flush_lock; static uint32_t nth; /* Number of threads. */ static wt_timestamp_t *active_timestamps; /* Oldest timestamps still in use. */ @@ -267,9 +262,7 @@ thread_flush_run(void *arg) * Currently not testing any of the flush tier configuration strings other than defaults. We * expect the defaults are what MongoDB wants for now. */ - testutil_check(pthread_rwlock_wrlock(&flush_lock)); testutil_check(session->flush_tier(session, NULL)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); printf("Flush tier %" PRIu32 " completed.\n", i); fflush(stdout); /* @@ -302,14 +295,12 @@ thread_run(void *arg) uint64_t i, active_ts; char cbuf[MAX_VAL], lbuf[MAX_VAL], obuf[MAX_VAL]; char kname[64], tscfg[64], uri[128]; - bool locked; __wt_random_init(&rnd); memset(cbuf, 0, sizeof(cbuf)); memset(lbuf, 0, sizeof(lbuf)); memset(obuf, 0, sizeof(obuf)); memset(kname, 0, sizeof(kname)); - locked = false; td = (THREAD_DATA *)arg; /* @@ -374,8 +365,6 @@ thread_run(void *arg) data.size = __wt_random(&rnd) % MAX_VAL; data.data = cbuf; cur_coll->set_value(cur_coll, &data); - testutil_check(pthread_rwlock_rdlock(&flush_lock)); - locked = true; if ((ret = cur_coll->insert(cur_coll)) == WT_ROLLBACK) goto rollback; testutil_check(ret); @@ -407,8 +396,6 @@ thread_run(void *arg) data.data = lbuf; cur_local->set_value(cur_local, &data); testutil_check(cur_local->insert(cur_local)); - testutil_check(pthread_rwlock_unlock(&flush_lock)); - locked = false; /* Save the timestamps and key separately for checking later. */ if (fprintf(fp, "%" PRIu64 " %" PRIu64 " %" PRIu64 "\n", active_ts, active_ts, i) < 0) @@ -417,10 +404,6 @@ thread_run(void *arg) if (0) { rollback: testutil_check(session->rollback_transaction(session, NULL)); - if (locked) { - testutil_check(pthread_rwlock_unlock(&flush_lock)); - locked = false; - } } /* We're done with the timestamps, allow oldest and stable to move forward. */ @@ -729,8 +712,6 @@ main(int argc, char *argv[]) testutil_check(testutil_parse_opts(argc, argv, opts)); testutil_build_dir(opts, build_dir, 512); - testutil_check(pthread_rwlock_init(&flush_lock, NULL)); - testutil_work_dir_from_path(home, sizeof(home), working_dir); /* * If the user wants to verify they need to tell us how many threads there were so we can find @@ -1011,7 +992,6 @@ main(int argc, char *argv[]) printf("OPLOG: %" PRIu64 " record(s) absent from %" PRIu64 "\n", absent_oplog, count); fatal = true; } - testutil_check(pthread_rwlock_destroy(&flush_lock)); if (fatal) return (EXIT_FAILURE); printf("%" PRIu64 " records verified\n", count); diff --git a/src/third_party/wiredtiger/test/suite/test_tiered08.py b/src/third_party/wiredtiger/test/suite/test_tiered08.py index f9b764b8eef..605b07481c1 100644..100755 --- a/src/third_party/wiredtiger/test/suite/test_tiered08.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered08.py @@ -52,12 +52,13 @@ class test_tiered08(wttest.WiredTigerTestCase, TieredConfigMixin): batch_size = 100000 # Keep inserting keys until we've done this many flush and checkpoint ops. - ckpt_flush_target = 10 + ckpt_target = 1000 + flush_target = 500 uri = "table:test_tiered08" def conn_config(self): - return get_conn_config(self) + '),statistics=(fast)' + return get_conn_config(self) + '),statistics=(fast),timing_stress_for_test=(tiered_flush_finish)' # Load the storage store extension. def conn_extensions(self, extlist): @@ -84,34 +85,29 @@ class test_tiered08(wttest.WiredTigerTestCase, TieredConfigMixin): self.pr('Populating tiered table') c = self.session.open_cursor(self.uri, None, None) - while ckpt_count < self.ckpt_flush_target or flush_count < self.ckpt_flush_target: + while ckpt_count < self.ckpt_target or flush_count < self.flush_target: for i in range(nkeys, nkeys + self.batch_size): c[self.key_gen(i)] = self.value_gen(i) nkeys += self.batch_size ckpt_count = self.get_stat(stat.conn.txn_checkpoint) flush_count = self.get_stat(stat.conn.flush_tier) + self.pr('Populating: ckpt {}, flush {}'.format(str(ckpt_count), str(flush_count))) c.close() return nkeys def verify(self, key_count): - self.pr('Verifying tiered table') + self.pr('Verifying tiered table: {}'.format(str(key_count))) c = self.session.open_cursor(self.uri, None, None) - for i in range(key_count): + # Speed up the test by not looking at every key/value pair. + for i in range(1, key_count, 237): self.assertEqual(c[self.key_gen(i)], self.value_gen(i)) c.close() def test_tiered08(self): - - # FIXME-WT-7833 - # This test can trigger races in file handle access during flush_tier. - # We will re-enable it when that is fixed. - self.skipTest('Concurrent flush_tier and insert operations not supported yet.') - cfg = self.conn_config() self.pr('Config is: ' + cfg) - intl_page = 'internal_page_max=16K' - base_create = 'key_format=S,value_format=S,' + intl_page - self.session.create(self.uri, base_create) + self.session.create(self.uri, + 'key_format=S,value_format=S,internal_page_max=4096,leaf_page_max=4096') done = threading.Event() ckpt = checkpoint_thread(self.conn, done) diff --git a/src/third_party/wiredtiger/test/suite/test_tiered14.py b/src/third_party/wiredtiger/test/suite/test_tiered14.py index 5b945ccb060..34a5714b6d2 100644..100755 --- a/src/third_party/wiredtiger/test/suite/test_tiered14.py +++ b/src/third_party/wiredtiger/test/suite/test_tiered14.py @@ -40,17 +40,14 @@ class test_tiered14(wttest.WiredTigerTestCase, TieredConfigMixin): uri = "table:test_tiered14-{}" # format for subtests - # FIXME-WT-7833: enable the commented scenarios and run the - # test with the --long option. - # The multiplier makes the size of keys and values progressively larger. # A multiplier of 0 makes the keys and values a single length. multiplier = [ ('0', dict(multiplier=0)), ('S', dict(multiplier=1)), ('M', dict(multiplier=10)), - #('L', dict(multiplier=100, long_only=True)), - #('XL', dict(multiplier=1000, long_only=True)), + ('L', dict(multiplier=100, long_only=True)), + ('XL', dict(multiplier=1000, long_only=True)), ] keyfmt = [ ('integer', dict(keyfmt='i')), |