From 0323cbd6a51391da9f78a43831a78185027fc12c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20G=C3=B3mez=20Ferro?= Date: Fri, 23 Dec 2022 16:34:21 +0000 Subject: SERVER-70437 Check dbVersion on sharding opobservers --- src/mongo/db/op_observer/op_observer_impl.cpp | 19 +++---- src/mongo/db/op_observer/op_observer_impl.h | 19 +++---- src/mongo/db/s/op_observer_sharding_impl.cpp | 49 ++++++++++------- src/mongo/db/s/op_observer_sharding_impl.h | 15 +++--- src/mongo/db/s/op_observer_sharding_test.cpp | 78 +++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 46 deletions(-) diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index df0f0ec9a59..2f119765ed9 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -647,17 +647,14 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, onWriteOpCompleted(opCtx, stmtIdsWritten, sessionTxnRecord); } - size_t index = 0; - for (auto it = first; it != last; it++, index++) { - auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index]; - shardObserveInsertOp(opCtx, - nss, - it->doc, - opTime, - shardingWriteRouter, - fromMigrate, - inMultiDocumentTransaction); - } + shardObserveInsertsOp(opCtx, + nss, + first, + last, + opTimeList, + shardingWriteRouter, + fromMigrate, + inMultiDocumentTransaction); if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); diff --git a/src/mongo/db/op_observer/op_observer_impl.h b/src/mongo/db/op_observer/op_observer_impl.h index 1aea9c92495..ec8c3ac652f 100644 --- a/src/mongo/db/op_observer/op_observer_impl.h +++ b/src/mongo/db/op_observer/op_observer_impl.h @@ -99,8 +99,8 @@ public: void onInserts(OperationContext* opCtx, const CollectionPtr& coll, - std::vector::const_iterator begin, - std::vector::const_iterator end, + std::vector::const_iterator first, + std::vector::const_iterator last, bool fromMigrate) final; void onInsertGlobalIndexKey(OperationContext* opCtx, @@ -246,13 +246,14 @@ private: virtual void shardObserveAboutToDelete(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& doc) {} - virtual void shardObserveInsertOp(OperationContext* opCtx, - const NamespaceString nss, - const BSONObj& insertedDoc, - const repl::OpTime& opTime, - const ShardingWriteRouter& shardingWriteRouter, - const bool fromMigrate, - const bool inMultiDocumentTransaction) {} + virtual void shardObserveInsertsOp(OperationContext* opCtx, + NamespaceString nss, + std::vector::const_iterator first, + std::vector::const_iterator last, + const std::vector& opTimeList, + const ShardingWriteRouter& shardingWriteRouter, + bool fromMigrate, + bool inMultiDocumentTransaction){}; virtual void shardObserveUpdateOp(OperationContext* opCtx, const NamespaceString nss, boost::optional preImageDoc, diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 88259ca6d42..f1a3bf73bff 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -29,6 +29,7 @@ #include "mongo/db/s/op_observer_sharding_impl.h" +#include "mongo/db/catalog/catalog_helper.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/database_sharding_state.h" @@ -107,18 +108,21 @@ void OpObserverShardingImpl::shardObserveAboutToDelete(OperationContext* opCtx, getIsMigrating(opCtx) = isMigrating(opCtx, nss, docToDelete); } -void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, - const NamespaceString nss, - const BSONObj& insertedDoc, - const repl::OpTime& opTime, - const ShardingWriteRouter& shardingWriteRouter, - const bool fromMigrate, - const bool inMultiDocumentTransaction) { +void OpObserverShardingImpl::shardObserveInsertsOp( + OperationContext* opCtx, + const NamespaceString nss, + std::vector::const_iterator first, + std::vector::const_iterator last, + const std::vector& opTimeList, + const ShardingWriteRouter& shardingWriteRouter, + const bool fromMigrate, + const bool inMultiDocumentTransaction) { if (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) return; auto* const css = shardingWriteRouter.getCss(); css->checkShardVersionOrThrow(opCtx); + catalog_helper::assertMatchingDbVersion(opCtx, nss.db()); auto* const csr = checked_cast(css); auto metadata = csr->getCurrentMetadataIfKnown(); @@ -127,21 +131,26 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, return; } - if (inMultiDocumentTransaction) { - const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); + int index = 0; + for (auto it = first; it != last; it++, index++) { + auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index]; - if (atClusterTime) { - const auto shardKey = - metadata->getShardKeyPattern().extractShardKeyFromDocThrows(insertedDoc); - assertIntersectingChunkHasNotMoved(opCtx, *metadata, shardKey, *atClusterTime); - } + if (inMultiDocumentTransaction) { + const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); - return; - } + if (atClusterTime) { + const auto shardKey = + metadata->getShardKeyPattern().extractShardKeyFromDocThrows(it->doc); + assertIntersectingChunkHasNotMoved(opCtx, *metadata, shardKey, *atClusterTime); + } - auto cloner = MigrationSourceManager::getCurrentCloner(*csr); - if (cloner) { - cloner->onInsertOp(opCtx, insertedDoc, opTime); + return; + } + + auto cloner = MigrationSourceManager::getCurrentCloner(*csr); + if (cloner) { + cloner->onInsertOp(opCtx, it->doc, opTime); + } } } @@ -155,6 +164,7 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, const bool inMultiDocumentTransaction) { auto* const css = shardingWriteRouter.getCss(); css->checkShardVersionOrThrow(opCtx); + catalog_helper::assertMatchingDbVersion(opCtx, nss.db()); auto* const csr = checked_cast(css); auto metadata = csr->getCurrentMetadataIfKnown(); @@ -190,6 +200,7 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, const bool inMultiDocumentTransaction) { auto* const css = shardingWriteRouter.getCss(); css->checkShardVersionOrThrow(opCtx); + catalog_helper::assertMatchingDbVersion(opCtx, nss.db()); auto* const csr = checked_cast(css); auto metadata = csr->getCurrentMetadataIfKnown(); diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h index 4e38292e80a..dca800a1291 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.h +++ b/src/mongo/db/s/op_observer_sharding_impl.h @@ -48,13 +48,14 @@ protected: void shardObserveAboutToDelete(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& docToDelete) override; - void shardObserveInsertOp(OperationContext* opCtx, - NamespaceString nss, - const BSONObj& insertedDoc, - const repl::OpTime& opTime, - const ShardingWriteRouter& shardingWriteRouter, - bool fromMigrate, - bool inMultiDocumentTransaction) override; + void shardObserveInsertsOp(OperationContext* opCtx, + NamespaceString nss, + std::vector::const_iterator first, + std::vector::const_iterator last, + const std::vector& opTimeList, + const ShardingWriteRouter& shardingWriteRouter, + bool fromMigrate, + bool inMultiDocumentTransaction) override; void shardObserveUpdateOp(OperationContext* opCtx, NamespaceString nss, boost::optional preImageDoc, diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp index 587cdb8bb42..8ba0e7a8f1e 100644 --- a/src/mongo/db/s/op_observer_sharding_test.cpp +++ b/src/mongo/db/s/op_observer_sharding_test.cpp @@ -28,18 +28,24 @@ */ #include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/op_observer/op_observer_registry.h" #include "mongo/db/op_observer/op_observer_util.h" +#include "mongo/db/op_observer/oplog_writer_impl.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/type_shard_identity.h" +#include "mongo/db/session/session_catalog_mongod.h" namespace mongo { namespace { const NamespaceString kTestNss("TestDB", "TestColl"); +const NamespaceString kUnshardedNss("TestDB", "UnshardedColl"); void setCollectionFilteringMetadata(OperationContext* opCtx, CollectionMetadata metadata) { AutoGetCollection autoColl(opCtx, kTestNss, MODE_X); @@ -55,8 +61,22 @@ protected: OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( operationContext()); + + Lock::GlobalWrite globalLock(operationContext()); + bool justCreated = false; + auto databaseHolder = DatabaseHolder::get(operationContext()); + auto db = databaseHolder->openDb(operationContext(), kTestNss.dbName(), &justCreated); + databaseHolder->setDbInfo( + operationContext(), + kTestNss.dbName(), + DatabaseType{kTestNss.dbName().db(), ShardId("this"), dbVersion1}); + ASSERT_TRUE(db); + ASSERT_TRUE(justCreated); + uassertStatusOK(createCollection( operationContext(), kTestNss.dbName(), BSON("create" << kTestNss.coll()))); + uassertStatusOK(createCollection( + operationContext(), kUnshardedNss.dbName(), BSON("create" << kUnshardedNss.coll()))); } /** @@ -91,6 +111,9 @@ protected: Timestamp(100, 0)), ShardId("this")); } + + const DatabaseVersion dbVersion0 = DatabaseVersion::makeFixed(); + const DatabaseVersion dbVersion1 = dbVersion0.makeUpdated(); }; TEST_F(DocumentKeyStateTest, MakeDocumentKeyStateUnsharded) { @@ -200,6 +223,61 @@ TEST_F(DocumentKeyStateTest, MakeDocumentKeyStateShardedWithIdHashInShardKey) { ASSERT_FALSE(OpObserverShardingImpl::isMigrating(operationContext(), kTestNss, doc)); } +TEST_F(DocumentKeyStateTest, CheckDBVersion) { + OpObserverRegistry opObserver; + opObserver.addObserver( + std::make_unique(std::make_unique())); + + OperationContext* opCtx = operationContext(); + AutoGetCollection autoColl(opCtx, kUnshardedNss, MODE_IX); + WriteUnitOfWork wuow(opCtx); + const bool fromMigrate = false; + auto shardVersion = ShardVersion::UNSHARDED(); + + // Insert parameters + std::vector toInsert; + toInsert.emplace_back(kUninitializedStmtId, BSON("_id" << 1)); + + // Update parameters + const auto criteria = BSON("_id" << 1); + const auto preImageDoc = criteria; + CollectionUpdateArgs updateArgs{preImageDoc}; + updateArgs.criteria = criteria; + updateArgs.stmtIds = {kUninitializedStmtId}; + updateArgs.updatedDoc = BSON("_id" << 1 << "data" + << "y"); + updateArgs.update = BSON("$set" << BSON("data" + << "y")); + OplogUpdateEntryArgs update(&updateArgs, *autoColl); + + // OpObserver calls + auto onInsert = [&]() { + opObserver.onInserts(opCtx, *autoColl, toInsert.begin(), toInsert.end(), fromMigrate); + }; + auto onUpdate = [&]() { opObserver.onUpdate(opCtx, update); }; + auto onDelete = [&]() { + opObserver.aboutToDelete(opCtx, *autoColl, BSON("_id" << 0)); + opObserver.onDelete(opCtx, *autoColl, kUninitializedStmtId, {}); + }; + + // Using the latest dbVersion works + { + ScopedSetShardRole scopedSetShardRole{ + operationContext(), kTestNss, shardVersion, dbVersion1}; + onInsert(); + onUpdate(); + onDelete(); + } + + // Using the old dbVersion fails + { + ScopedSetShardRole scopedSetShardRole{ + operationContext(), kTestNss, shardVersion, dbVersion0}; + ASSERT_THROWS_CODE(onInsert(), AssertionException, ErrorCodes::StaleDbVersion); + ASSERT_THROWS_CODE(onUpdate(), AssertionException, ErrorCodes::StaleDbVersion); + ASSERT_THROWS_CODE(onDelete(), AssertionException, ErrorCodes::StaleDbVersion); + } +} } // namespace } // namespace mongo -- cgit v1.2.1