diff options
12 files changed, 184 insertions, 48 deletions
diff --git a/jstests/core/timeseries/clustered_index_types.js b/jstests/core/timeseries/clustered_index_types.js new file mode 100644 index 00000000000..c63788a02f0 --- /dev/null +++ b/jstests/core/timeseries/clustered_index_types.js @@ -0,0 +1,32 @@ +/** + * Tests inserting duplicate _id values on a collection clustered by _id. + * + * @tags: [ + * assumes_against_mongod_not_mongos, + * assumes_no_implicit_collection_creation_after_drop, + * does_not_support_stepdowns, + * requires_fcv_49, + * requires_find_command, + * requires_wiredtiger, + * ] + */ + +(function() { +"use strict"; + +const collName = 'system.buckets.test'; +const coll = db[collName]; +coll.drop(); + +assert.commandWorked(db.createCollection(collName, {clusteredIndex: {}})); + +// Expect that duplicates are rejected. +let oid = new ObjectId(); +assert.commandWorked(coll.insert({_id: oid})); +assert.commandFailedWithCode(coll.insert({_id: oid}), ErrorCodes.DuplicateKey); +assert.eq(1, coll.find({_id: oid}).itcount()); + +// Updates should work. +assert.commandWorked(coll.update({_id: oid}, {a: 1})); +assert.eq(1, coll.find({_id: oid}).itcount()); +})(); diff --git a/src/mongo/db/storage/index_entry_comparison.cpp b/src/mongo/db/storage/index_entry_comparison.cpp index 965a4e8d796..f609ca07f0b 100644 --- a/src/mongo/db/storage/index_entry_comparison.cpp +++ b/src/mongo/db/storage/index_entry_comparison.cpp @@ -194,7 +194,10 @@ Status buildDupKeyErrorStatus(const BSONObj& key, StringBuilder sb; sb << "E11000 duplicate key error"; sb << " collection: " << collectionNamespace; - sb << " index: " << indexName; + if (indexName.size()) { + // This helper may be used for clustered collections when there is no _id index. + sb << " index: " << indexName; + } if (hasCollation) { sb << " collation: " << indexCollation; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp index 430f8302fd5..f65f0b4d674 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp @@ -46,20 +46,10 @@ WiredTigerCursor::WiredTigerCursor(const std::string& uri, _tableID = tableID; _ru = WiredTigerRecoveryUnit::get(opCtx); _session = _ru->getSession(); - _readOnce = _ru->getReadOnce(); - - // Attempt to retrieve the cursor from the cache. Cursors using the 'read_once' option will - // not be in the cache. - if (!_readOnce) { - _cursor = _session->getCachedCursor(uri, tableID); - if (_cursor) { - return; - } - } // Construct a new cursor with the provided options. str::stream builder; - if (_readOnce) { + if (_ru->getReadOnce()) { builder << "read_once=true,"; } // Add this option last to avoid needing a trailing comma. This enables an optimization in @@ -68,24 +58,23 @@ WiredTigerCursor::WiredTigerCursor(const std::string& uri, builder << "overwrite=false"; } - const std::string config = builder; + _config = builder; + + // Attempt to retrieve a cursor from the cache. + _cursor = _session->getCachedCursor(tableID, _config); + if (_cursor) { + return; + } + try { - _cursor = _session->getNewCursor(uri, config.c_str()); + _cursor = _session->getNewCursor(uri, _config.c_str()); } catch (const ExceptionFor<ErrorCodes::CursorNotFound>& ex) { LOGV2_FATAL_NOTRACE(50883, "{ex}", "Cursor not found", "error"_attr = ex); } } WiredTigerCursor::~WiredTigerCursor() { - dassert(_ru->getReadOnce() == _readOnce); - - // Read-once cursors will never take cursors from the cursor cache, and should never release - // cursors into the cursor cache. - if (_readOnce) { - _session->closeCursor(_cursor); - } else { - _session->releaseCursor(_tableID, _cursor); - } + _session->releaseCursor(_tableID, _cursor, _config); } void WiredTigerCursor::reset() { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h index 360d7d3ced3..94063df6e78 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h @@ -78,7 +78,7 @@ protected: uint64_t _tableID; WiredTigerRecoveryUnit* _ru; WiredTigerSession* _session; - bool _readOnce; + std::string _config; WT_CURSOR* _cursor = nullptr; // Owned }; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 810a69b9342..8792ae37f9c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -1471,6 +1471,8 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getRecordStore(OperationContext params.engineName = _canonicalName; params.isCapped = options.capped; params.keyFormat = (options.clusteredIndex) ? KeyFormat::String : KeyFormat::Long; + // Record stores clustered by _id need to guarantee uniqueness by preventing overwrites. + params.overwrite = options.clusteredIndex ? false : true; params.isEphemeral = _ephemeral; params.cappedCallback = nullptr; params.sizeStorer = _sizeStorer.get(); @@ -1610,6 +1612,7 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::makeTemporaryRecordStore(Operat params.engineName = _canonicalName; params.isCapped = false; params.keyFormat = KeyFormat::Long; + params.overwrite = true; params.isEphemeral = _ephemeral; params.cappedCallback = nullptr; // Temporary collections do not need to persist size information to the size storer. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 54b50324dd6..481d1ba160e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -837,6 +837,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, _engineName(params.engineName), _isCapped(params.isCapped), _keyFormat(params.keyFormat), + _overwrite(params.overwrite), _isEphemeral(params.isEphemeral), _isLogged(!isTemp() && WiredTigerUtil::useTableLogging( @@ -1243,7 +1244,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, for (size_t i = 0; i < nRecords; i++) totalLength += records[i].data.size(); - WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); + WiredTigerCursor curwrap(_uri, _tableId, _overwrite, opCtx); curwrap.assertInActiveTxn(); WT_CURSOR* c = curwrap.get(); invariant(c); @@ -1295,6 +1296,22 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, WiredTigerItem value(record.data.data(), record.data.size()); c->set_value(c, value.Get()); int ret = WT_OP_CHECK(wiredTigerCursorInsert(opCtx, c)); + + if (ret == WT_DUPLICATE_KEY) { + invariant(!_overwrite); + invariant(_keyFormat == KeyFormat::String); + + // Generate a useful error message that is consistent with duplicate key error messages + // on indexes. + BSONObjBuilder builder; + builder.append("", OID::from(record.id.strData())); + return buildDupKeyErrorStatus(builder.obj(), + NamespaceString(ns()), + "" /* indexName */, + BSON("_id" << 1), + BSONObj() /* collation */); + } + if (ret) return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord"); @@ -1347,10 +1364,10 @@ StatusWith<Timestamp> WiredTigerRecordStore::getLatestOplogTimestamp( WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache(); auto sessRaii = cache->getSession(); WT_CURSOR* cursor = writeConflictRetry(opCtx, "getLatestOplogTimestamp", "local.oplog.rs", [&] { - auto cachedCursor = sessRaii->getCachedCursor(_uri, _tableId); + auto cachedCursor = sessRaii->getCachedCursor(_tableId, ""); return cachedCursor ? cachedCursor : sessRaii->getNewCursor(_uri); }); - ON_BLOCK_EXIT([&] { sessRaii->releaseCursor(_tableId, cursor); }); + ON_BLOCK_EXIT([&] { sessRaii->releaseCursor(_tableId, cursor, ""); }); int ret = cursor->prev(cursor); if (ret == WT_NOTFOUND) { return Status(ErrorCodes::CollectionIsEmpty, "oplog is empty"); @@ -1372,10 +1389,10 @@ StatusWith<Timestamp> WiredTigerRecordStore::getEarliestOplogTimestamp(Operation auto sessRaii = cache->getSession(); WT_CURSOR* cursor = writeConflictRetry(opCtx, "getEarliestOplogTimestamp", "local.oplog.rs", [&] { - auto cachedCursor = sessRaii->getCachedCursor(_uri, _tableId); + auto cachedCursor = sessRaii->getCachedCursor(_tableId, ""); return cachedCursor ? cachedCursor : sessRaii->getNewCursor(_uri); }); - ON_BLOCK_EXIT([&] { sessRaii->releaseCursor(_tableId, cursor); }); + ON_BLOCK_EXIT([&] { sessRaii->releaseCursor(_tableId, cursor, ""); }); auto ret = cursor->next(cursor); if (ret == WT_NOTFOUND) { return Status(ErrorCodes::CollectionIsEmpty, "oplog is empty"); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index ff31d9e11c4..3ed205c50d4 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -105,6 +105,7 @@ public: std::string engineName; bool isCapped; KeyFormat keyFormat; + bool overwrite; bool isEphemeral; boost::optional<int64_t> oplogMaxSize; CappedCallback* cappedCallback; @@ -328,6 +329,8 @@ private: const bool _isCapped; // The format of this RecordStore's RecordId keys. const KeyFormat _keyFormat; + // Whether or not to allow writes to overwrite existing records with the same RecordId. + const bool _overwrite; // True if the storage engine is an in-memory storage engine const bool _isEphemeral; // True if WiredTiger is logging updates to this table diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index 2f28df784a2..81ea31b3fe8 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -99,6 +99,7 @@ public: params.engineName = kWiredTigerEngineName; params.isCapped = false; params.keyFormat = KeyFormat::Long; + params.overwrite = true; params.isEphemeral = false; params.cappedCallback = nullptr; params.sizeStorer = nullptr; @@ -615,7 +616,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampAfterSetTimestampOnAbor ASSERT(!commitTs); } -TEST_F(WiredTigerRecoveryUnitTestFixture, ReadOnceCursorsAreNotCached) { +TEST_F(WiredTigerRecoveryUnitTestFixture, ReadOnceCursorsCached) { auto opCtx = clientAndCtx1.second.get(); // Hold the global lock throughout the test to avoid having the global lock destructor @@ -649,7 +650,8 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, ReadOnceCursorsAreNotCached) { ru->abandonSnapshot(); - // Test 2: A read-once operation should create a new cursor and immediately close it when done. + // Test 2: A read-once operation should create a new cursor because it has a different + // configuration. This will be released into the cache. ru->setReadOnce(true); @@ -657,18 +659,96 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, ReadOnceCursorsAreNotCached) { ru->getSession()->closeAllCursors(uri); cachedCursorsBefore = ru->getSession()->cachedCursors(); - // The subsequent read operation will use a read_once cursor, which will not be from the cache, - // and will not be released into the cache. + // The subsequent read operation will create a new read_once cursor and release into the cache. ASSERT_TRUE(rs->findRecord(opCtx, s.getValue(), &rd)); - // No new cursors should have been released into the cache. - ASSERT_EQ(ru->getSession()->cachedCursors(), cachedCursorsBefore); + // A new cursor should have been released into the cache. + ASSERT_GT(ru->getSession()->cachedCursors(), cachedCursorsBefore); // All opened cursors are closed. ASSERT_EQ(0, ru->getSession()->cursorsOut()); ASSERT(ru->getReadOnce()); } +TEST_F(WiredTigerRecoveryUnitTestFixture, CacheMixedOverwrite) { + auto opCtx = clientAndCtx1.second.get(); + std::unique_ptr<RecordStore> rs(harnessHelper->createRecordStore(opCtx, "test.A")); + auto uri = dynamic_cast<WiredTigerRecordStore*>(rs.get())->getURI(); + + // Hold the global lock throughout the test to avoid having the global lock destructor + // prematurely abandon snapshots. + Lock::GlobalLock globalLock(opCtx, MODE_IX); + auto ru = WiredTigerRecoveryUnit::get(opCtx); + + // Close all cached cursors to establish a 'before' state. + auto session = ru->getSession(); + ru->getSession()->closeAllCursors(uri); + int cachedCursorsBefore = ru->getSession()->cachedCursors(); + + // Use a large, unused table ID for this test to ensure we don't collide with any other table + // ids. + int tableId = 999999999; + WT_CURSOR* cursor; + + // Expect no cached cursors. + { + auto config = ""; + cursor = session->getCachedCursor(tableId, config); + ASSERT_FALSE(cursor); + + cursor = session->getNewCursor(uri, config); + ASSERT(cursor); + session->releaseCursor(tableId, cursor, config); + ASSERT_GT(session->cachedCursors(), cachedCursorsBefore); + } + + cachedCursorsBefore = session->cachedCursors(); + + // Use a different overwrite setting, expect no cached cursors. + { + auto config = "overwrite=false"; + cursor = session->getCachedCursor(tableId, config); + ASSERT_FALSE(cursor); + + cursor = session->getNewCursor(uri, config); + ASSERT(cursor); + session->releaseCursor(tableId, cursor, config); + ASSERT_GT(session->cachedCursors(), cachedCursorsBefore); + } + + cachedCursorsBefore = session->cachedCursors(); + + // Expect cursors to be cached. + { + auto config = ""; + cursor = session->getCachedCursor(tableId, config); + ASSERT(cursor); + session->releaseCursor(tableId, cursor, config); + ASSERT_EQ(session->cachedCursors(), cachedCursorsBefore); + } + + // Expect cursors to be cached. + { + auto config = "overwrite=false"; + cursor = session->getCachedCursor(tableId, config); + ASSERT(cursor); + session->releaseCursor(tableId, cursor, config); + ASSERT_EQ(session->cachedCursors(), cachedCursorsBefore); + } + + // Use yet another cursor config, and expect no cursors to be cached. + { + auto config = "overwrite=true"; + cursor = session->getCachedCursor(tableId, config); + ASSERT_FALSE(cursor); + + cursor = session->getNewCursor(uri, config); + ASSERT(cursor); + session->releaseCursor(tableId, cursor, config); + ASSERT_GT(session->cachedCursors(), cachedCursorsBefore); + } +} + TEST_F(WiredTigerRecoveryUnitTestFixture, CommitWithDurableTimestamp) { auto opCtx = clientAndCtx1.second.get(); Timestamp ts1(3, 3); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 2f280c159c0..a627dd65d5b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -111,10 +111,13 @@ void _openCursor(WT_SESSION* session, } } // namespace -WT_CURSOR* WiredTigerSession::getCachedCursor(const std::string& uri, uint64_t id) { +WT_CURSOR* WiredTigerSession::getCachedCursor(uint64_t id, const std::string& config) { // Find the most recently used cursor for (CursorCache::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { - if (i->_id == id) { + // Ensure that all properties of this cursor are identical to avoid mixing cursor + // configurations. Note that this uses an exact string match, so cursor configurations with + // parameters in different orders will not be considered equivalent. + if (i->_id == id && i->_config == config) { WT_CURSOR* c = i->_cursor; _cursors.erase(i); _cursorsOut++; @@ -131,7 +134,7 @@ WT_CURSOR* WiredTigerSession::getNewCursor(const std::string& uri, const char* c return cursor; } -void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { +void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor, const std::string& config) { invariant(_session); invariant(cursor); _cursorsOut--; @@ -139,7 +142,7 @@ void WiredTigerSession::releaseCursor(uint64_t id, WT_CURSOR* cursor) { invariantWTOK(cursor->reset(cursor)); // Cursors are pushed to the front of the list and removed from the back - _cursors.push_front(WiredTigerCachedCursor(id, _cursorGen++, cursor)); + _cursors.push_front(WiredTigerCachedCursor(id, _cursorGen++, cursor, config)); // A negative value for wiredTigercursorCacheSize means to use hybrid caching. std::uint32_t cacheSize = abs(gWiredTigerCursorCacheSize.load()); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index 81ef8f5c7a3..960ad11e75c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -47,12 +47,13 @@ class WiredTigerSessionCache; class WiredTigerCachedCursor { public: - WiredTigerCachedCursor(uint64_t id, uint64_t gen, WT_CURSOR* cursor) - : _id(id), _gen(gen), _cursor(cursor) {} + WiredTigerCachedCursor(uint64_t id, uint64_t gen, WT_CURSOR* cursor, const std::string& config) + : _id(id), _gen(gen), _cursor(cursor), _config(config) {} uint64_t _id; // Source ID, assigned to each URI uint64_t _gen; // Generation, used to age out old cursors WT_CURSOR* _cursor; + std::string _config; // Cursor config. Do not serve cursors with different configurations }; /** @@ -91,12 +92,12 @@ public: } /** - * Gets a cursor on the table id 'id'. + * Gets a cursor on the table id 'id' with optional configuration, 'config'. * * This may return a cursor from the cursor cache and these cursors should *always* be released * into the cache by calling releaseCursor(). */ - WT_CURSOR* getCachedCursor(const std::string& uri, uint64_t id); + WT_CURSOR* getCachedCursor(uint64_t id, const std::string& config); /** @@ -121,8 +122,10 @@ public: /** * Release a cursor into the cursor cache and close old cursors if the number of cursors in the * cache exceeds wiredTigerCursorCacheSize. + * The exact cursor config that was used to create the cursor must be provided or subsequent + * users will retrieve cursors with incorrect configurations. */ - void releaseCursor(uint64_t id, WT_CURSOR* cursor); + void releaseCursor(uint64_t id, WT_CURSOR* cursor, const std::string& config); /** * Close a cursor without releasing it into the cursor cache. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp index d0b3d9cc298..cf1d8c1eb54 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp @@ -123,6 +123,7 @@ public: params.engineName = kWiredTigerEngineName; params.isCapped = false; params.keyFormat = collOptions.clusteredIndex ? KeyFormat::String : KeyFormat::Long; + params.overwrite = collOptions.clusteredIndex ? false : true; params.isEphemeral = false; params.cappedCallback = nullptr; params.sizeStorer = nullptr; @@ -163,6 +164,7 @@ public: params.engineName = kWiredTigerEngineName; params.isCapped = true; params.keyFormat = KeyFormat::Long; + params.overwrite = true; params.isEphemeral = false; // Large enough not to exceed capped limits. params.oplogMaxSize = 1024 * 1024 * 1024; @@ -257,6 +259,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { params.engineName = kWiredTigerEngineName; params.isCapped = false; params.keyFormat = KeyFormat::Long; + params.overwrite = true; params.isEphemeral = false; params.cappedCallback = nullptr; params.sizeStorer = &ss; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp index e2173229284..07fabadd634 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_util.cpp @@ -219,7 +219,7 @@ StatusWith<std::string> WiredTigerUtil::getMetadataCreate(OperationContext* opCt WT_CURSOR* cursor = nullptr; try { const std::string metadataURI = "metadata:create"; - cursor = session->getCachedCursor(metadataURI, WiredTigerSession::kMetadataCreateTableId); + cursor = session->getCachedCursor(WiredTigerSession::kMetadataCreateTableId, ""); if (!cursor) { cursor = session->getNewCursor(metadataURI); } @@ -228,7 +228,7 @@ StatusWith<std::string> WiredTigerUtil::getMetadataCreate(OperationContext* opCt } invariant(cursor); auto releaser = makeGuard( - [&] { session->releaseCursor(WiredTigerSession::kMetadataCreateTableId, cursor); }); + [&] { session->releaseCursor(WiredTigerSession::kMetadataCreateTableId, cursor, ""); }); return _getMetadata(cursor, uri); } @@ -249,7 +249,7 @@ StatusWith<std::string> WiredTigerUtil::getMetadata(OperationContext* opCtx, Str WT_CURSOR* cursor = nullptr; try { const std::string metadataURI = "metadata:"; - cursor = session->getCachedCursor(metadataURI, WiredTigerSession::kMetadataTableId); + cursor = session->getCachedCursor(WiredTigerSession::kMetadataTableId, ""); if (!cursor) { cursor = session->getNewCursor(metadataURI); } @@ -258,7 +258,7 @@ StatusWith<std::string> WiredTigerUtil::getMetadata(OperationContext* opCtx, Str } invariant(cursor); auto releaser = - makeGuard([&] { session->releaseCursor(WiredTigerSession::kMetadataTableId, cursor); }); + makeGuard([&] { session->releaseCursor(WiredTigerSession::kMetadataTableId, cursor, ""); }); return _getMetadata(cursor, uri); } |