summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gómez Ferro <daniel.gomezferro@mongodb.com>2022-12-23 16:34:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-23 17:45:25 +0000
commit0323cbd6a51391da9f78a43831a78185027fc12c (patch)
treebaaf88814343a182fe84c9f2e1ccd2e222f69976
parent0da1080c1e9c84a9f6bd6f6f16404f1af49495dc (diff)
downloadmongo-0323cbd6a51391da9f78a43831a78185027fc12c.tar.gz
SERVER-70437 Check dbVersion on sharding opobservers
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp19
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.h19
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp49
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.h15
-rw-r--r--src/mongo/db/s/op_observer_sharding_test.cpp78
5 files changed, 134 insertions, 46 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index df0f0ec9a59..2f119765ed9 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -647,17 +647,14 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
onWriteOpCompleted(opCtx, stmtIdsWritten, sessionTxnRecord);
}
- size_t index = 0;
- for (auto it = first; it != last; it++, index++) {
- auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
- shardObserveInsertOp(opCtx,
- nss,
- it->doc,
- opTime,
- shardingWriteRouter,
- fromMigrate,
- inMultiDocumentTransaction);
- }
+ shardObserveInsertsOp(opCtx,
+ nss,
+ first,
+ last,
+ opTimeList,
+ shardingWriteRouter,
+ fromMigrate,
+ inMultiDocumentTransaction);
if (nss.coll() == "system.js") {
Scope::storedFuncMod(opCtx);
diff --git a/src/mongo/db/op_observer/op_observer_impl.h b/src/mongo/db/op_observer/op_observer_impl.h
index 1aea9c92495..ec8c3ac652f 100644
--- a/src/mongo/db/op_observer/op_observer_impl.h
+++ b/src/mongo/db/op_observer/op_observer_impl.h
@@ -99,8 +99,8 @@ public:
void onInserts(OperationContext* opCtx,
const CollectionPtr& coll,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) final;
void onInsertGlobalIndexKey(OperationContext* opCtx,
@@ -246,13 +246,14 @@ private:
virtual void shardObserveAboutToDelete(OperationContext* opCtx,
NamespaceString const& nss,
BSONObj const& doc) {}
- virtual void shardObserveInsertOp(OperationContext* opCtx,
- const NamespaceString nss,
- const BSONObj& insertedDoc,
- const repl::OpTime& opTime,
- const ShardingWriteRouter& shardingWriteRouter,
- const bool fromMigrate,
- const bool inMultiDocumentTransaction) {}
+ virtual void shardObserveInsertsOp(OperationContext* opCtx,
+ NamespaceString nss,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
+ const std::vector<repl::OpTime>& opTimeList,
+ const ShardingWriteRouter& shardingWriteRouter,
+ bool fromMigrate,
+ bool inMultiDocumentTransaction){};
virtual void shardObserveUpdateOp(OperationContext* opCtx,
const NamespaceString nss,
boost::optional<BSONObj> preImageDoc,
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index 88259ca6d42..f1a3bf73bff 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -29,6 +29,7 @@
#include "mongo/db/s/op_observer_sharding_impl.h"
+#include "mongo/db/catalog/catalog_helper.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/database_sharding_state.h"
@@ -107,18 +108,21 @@ void OpObserverShardingImpl::shardObserveAboutToDelete(OperationContext* opCtx,
getIsMigrating(opCtx) = isMigrating(opCtx, nss, docToDelete);
}
-void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx,
- const NamespaceString nss,
- const BSONObj& insertedDoc,
- const repl::OpTime& opTime,
- const ShardingWriteRouter& shardingWriteRouter,
- const bool fromMigrate,
- const bool inMultiDocumentTransaction) {
+void OpObserverShardingImpl::shardObserveInsertsOp(
+ OperationContext* opCtx,
+ const NamespaceString nss,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
+ const std::vector<repl::OpTime>& opTimeList,
+ const ShardingWriteRouter& shardingWriteRouter,
+ const bool fromMigrate,
+ const bool inMultiDocumentTransaction) {
if (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
return;
auto* const css = shardingWriteRouter.getCss();
css->checkShardVersionOrThrow(opCtx);
+ catalog_helper::assertMatchingDbVersion(opCtx, nss.db());
auto* const csr = checked_cast<CollectionShardingRuntime*>(css);
auto metadata = csr->getCurrentMetadataIfKnown();
@@ -127,21 +131,26 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx,
return;
}
- if (inMultiDocumentTransaction) {
- const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
+ int index = 0;
+ for (auto it = first; it != last; it++, index++) {
+ auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
- if (atClusterTime) {
- const auto shardKey =
- metadata->getShardKeyPattern().extractShardKeyFromDocThrows(insertedDoc);
- assertIntersectingChunkHasNotMoved(opCtx, *metadata, shardKey, *atClusterTime);
- }
+ if (inMultiDocumentTransaction) {
+ const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
- return;
- }
+ if (atClusterTime) {
+ const auto shardKey =
+ metadata->getShardKeyPattern().extractShardKeyFromDocThrows(it->doc);
+ assertIntersectingChunkHasNotMoved(opCtx, *metadata, shardKey, *atClusterTime);
+ }
- auto cloner = MigrationSourceManager::getCurrentCloner(*csr);
- if (cloner) {
- cloner->onInsertOp(opCtx, insertedDoc, opTime);
+ return;
+ }
+
+ auto cloner = MigrationSourceManager::getCurrentCloner(*csr);
+ if (cloner) {
+ cloner->onInsertOp(opCtx, it->doc, opTime);
+ }
}
}
@@ -155,6 +164,7 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx,
const bool inMultiDocumentTransaction) {
auto* const css = shardingWriteRouter.getCss();
css->checkShardVersionOrThrow(opCtx);
+ catalog_helper::assertMatchingDbVersion(opCtx, nss.db());
auto* const csr = checked_cast<CollectionShardingRuntime*>(css);
auto metadata = csr->getCurrentMetadataIfKnown();
@@ -190,6 +200,7 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx,
const bool inMultiDocumentTransaction) {
auto* const css = shardingWriteRouter.getCss();
css->checkShardVersionOrThrow(opCtx);
+ catalog_helper::assertMatchingDbVersion(opCtx, nss.db());
auto* const csr = checked_cast<CollectionShardingRuntime*>(css);
auto metadata = csr->getCurrentMetadataIfKnown();
diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h
index 4e38292e80a..dca800a1291 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.h
+++ b/src/mongo/db/s/op_observer_sharding_impl.h
@@ -48,13 +48,14 @@ protected:
void shardObserveAboutToDelete(OperationContext* opCtx,
NamespaceString const& nss,
BSONObj const& docToDelete) override;
- void shardObserveInsertOp(OperationContext* opCtx,
- NamespaceString nss,
- const BSONObj& insertedDoc,
- const repl::OpTime& opTime,
- const ShardingWriteRouter& shardingWriteRouter,
- bool fromMigrate,
- bool inMultiDocumentTransaction) override;
+ void shardObserveInsertsOp(OperationContext* opCtx,
+ NamespaceString nss,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
+ const std::vector<repl::OpTime>& opTimeList,
+ const ShardingWriteRouter& shardingWriteRouter,
+ bool fromMigrate,
+ bool inMultiDocumentTransaction) override;
void shardObserveUpdateOp(OperationContext* opCtx,
NamespaceString nss,
boost::optional<BSONObj> preImageDoc,
diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp
index 587cdb8bb42..8ba0e7a8f1e 100644
--- a/src/mongo/db/s/op_observer_sharding_test.cpp
+++ b/src/mongo/db/s/op_observer_sharding_test.cpp
@@ -28,18 +28,24 @@
*/
#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/op_observer/op_observer_registry.h"
#include "mongo/db/op_observer/op_observer_util.h"
+#include "mongo/db/op_observer/oplog_writer_impl.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/db/s/type_shard_identity.h"
+#include "mongo/db/session/session_catalog_mongod.h"
namespace mongo {
namespace {
const NamespaceString kTestNss("TestDB", "TestColl");
+const NamespaceString kUnshardedNss("TestDB", "UnshardedColl");
void setCollectionFilteringMetadata(OperationContext* opCtx, CollectionMetadata metadata) {
AutoGetCollection autoColl(opCtx, kTestNss, MODE_X);
@@ -55,8 +61,22 @@ protected:
OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
operationContext());
+
+ Lock::GlobalWrite globalLock(operationContext());
+ bool justCreated = false;
+ auto databaseHolder = DatabaseHolder::get(operationContext());
+ auto db = databaseHolder->openDb(operationContext(), kTestNss.dbName(), &justCreated);
+ databaseHolder->setDbInfo(
+ operationContext(),
+ kTestNss.dbName(),
+ DatabaseType{kTestNss.dbName().db(), ShardId("this"), dbVersion1});
+ ASSERT_TRUE(db);
+ ASSERT_TRUE(justCreated);
+
uassertStatusOK(createCollection(
operationContext(), kTestNss.dbName(), BSON("create" << kTestNss.coll())));
+ uassertStatusOK(createCollection(
+ operationContext(), kUnshardedNss.dbName(), BSON("create" << kUnshardedNss.coll())));
}
/**
@@ -91,6 +111,9 @@ protected:
Timestamp(100, 0)),
ShardId("this"));
}
+
+ const DatabaseVersion dbVersion0 = DatabaseVersion::makeFixed();
+ const DatabaseVersion dbVersion1 = dbVersion0.makeUpdated();
};
TEST_F(DocumentKeyStateTest, MakeDocumentKeyStateUnsharded) {
@@ -200,6 +223,61 @@ TEST_F(DocumentKeyStateTest, MakeDocumentKeyStateShardedWithIdHashInShardKey) {
ASSERT_FALSE(OpObserverShardingImpl::isMigrating(operationContext(), kTestNss, doc));
}
+TEST_F(DocumentKeyStateTest, CheckDBVersion) {
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(
+ std::make_unique<OpObserverShardingImpl>(std::make_unique<OplogWriterImpl>()));
+
+ OperationContext* opCtx = operationContext();
+ AutoGetCollection autoColl(opCtx, kUnshardedNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ const bool fromMigrate = false;
+ auto shardVersion = ShardVersion::UNSHARDED();
+
+ // Insert parameters
+ std::vector<InsertStatement> toInsert;
+ toInsert.emplace_back(kUninitializedStmtId, BSON("_id" << 1));
+
+ // Update parameters
+ const auto criteria = BSON("_id" << 1);
+ const auto preImageDoc = criteria;
+ CollectionUpdateArgs updateArgs{preImageDoc};
+ updateArgs.criteria = criteria;
+ updateArgs.stmtIds = {kUninitializedStmtId};
+ updateArgs.updatedDoc = BSON("_id" << 1 << "data"
+ << "y");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "y"));
+ OplogUpdateEntryArgs update(&updateArgs, *autoColl);
+
+ // OpObserver calls
+ auto onInsert = [&]() {
+ opObserver.onInserts(opCtx, *autoColl, toInsert.begin(), toInsert.end(), fromMigrate);
+ };
+ auto onUpdate = [&]() { opObserver.onUpdate(opCtx, update); };
+ auto onDelete = [&]() {
+ opObserver.aboutToDelete(opCtx, *autoColl, BSON("_id" << 0));
+ opObserver.onDelete(opCtx, *autoColl, kUninitializedStmtId, {});
+ };
+
+ // Using the latest dbVersion works
+ {
+ ScopedSetShardRole scopedSetShardRole{
+ operationContext(), kTestNss, shardVersion, dbVersion1};
+ onInsert();
+ onUpdate();
+ onDelete();
+ }
+
+ // Using the old dbVersion fails
+ {
+ ScopedSetShardRole scopedSetShardRole{
+ operationContext(), kTestNss, shardVersion, dbVersion0};
+ ASSERT_THROWS_CODE(onInsert(), AssertionException, ErrorCodes::StaleDbVersion);
+ ASSERT_THROWS_CODE(onUpdate(), AssertionException, ErrorCodes::StaleDbVersion);
+ ASSERT_THROWS_CODE(onDelete(), AssertionException, ErrorCodes::StaleDbVersion);
+ }
+}
} // namespace
} // namespace mongo