diff options
-rw-r--r-- | jstests/replsets/tenant_migration_retryable_write_retry.js | 53 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_graph_lookup_test.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_util.cpp | 6 |
8 files changed, 124 insertions, 69 deletions
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js index 2e1aae90924..1bb104c08be 100644 --- a/jstests/replsets/tenant_migration_retryable_write_retry.js +++ b/jstests/replsets/tenant_migration_retryable_write_retry.js @@ -224,16 +224,15 @@ assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({ // The aggregation pipeline will return an array of oplog entries (pre-image/post-image oplog // entries included) for retryable writes that committed before 'startFetchingTimestamp' sorted -// in ascending order of "ts". The pipeline doesn't currently support retryable write oplog chains -// that exceed 100 MB since pipeline stages have a memory limit of 100 MB. +// in ascending order of "ts". const aggRes = donorPrimary.getDB("config").runCommand({ aggregate: "transactions", pipeline: [ // Fetch the config.transactions entries. {$match: {"lastWriteOpTime.ts": {$lt: startFetchingTimestamp}}}, - // Fetch latest oplog entry for each config.transactions entry. + // Fetch latest oplog entry for each config.transactions entry from the oplog view. {$lookup: { - from: {db: "local", coll: "oplog.rs"}, + from: {db: "local", coll: "system.tenantMigration.oplogView"}, localField: "lastWriteOpTime.ts", foreignField: "ts", // This array is expected to contain exactly one element. @@ -242,39 +241,65 @@ const aggRes = donorPrimary.getDB("config").runCommand({ // Replace the single-element 'lastOps' array field with a single 'lastOp' field. {$addFields: {lastOp: {$first: "$lastOps"}}}, {$unset: "lastOps"}, - // Fetch preImage oplog entry for findAndModify. + // Fetch preImage oplog entry for findAndModify from the oplog view. {$lookup: { - from: {db: "local", coll: "oplog.rs"}, + from: {db: "local", coll: "system.tenantMigration.oplogView"}, localField: "lastOp.preImageOpTime.ts", foreignField: "ts", // This array is expected to contain exactly one element if the 'preImageOpTime' // field is not null. as: "preImageOps" }}, - // Fetch postImage oplog entry for findAndModify. + // Fetch postImage oplog entry for findAndModify from the oplog view. {$lookup: { - from: {db: "local", coll: "oplog.rs"}, + from: {db: "local", coll: "system.tenantMigration.oplogView"}, localField: "lastOp.postImageOpTime.ts", foreignField: "ts", // This array is expected to contain exactly one element if the 'postImageOpTime' // field is not null. as: "postImageOps" }}, - // Fetch oplog entries in each chain for insert, update, or delete. + // Fetch oplog entries in each chain for insert, update, or delete from the oplog view. {$graphLookup: { - from: {db: "local", coll: "oplog.rs"}, + from: {db: "local", coll: "system.tenantMigration.oplogView"}, startWith: "$lastOp.ts", connectFromField: "prevOpTime.ts", connectToField: "ts", as: "history", + depthField: "depthForTenantMigration", + }}, + // Sort the oplog entries in each oplog chain. + {$set: { + history: {$reverseArray: {$reduce: { + input: "$history", + initialValue: {$range: [0, {$size: "$history"}]}, + in: {$concatArrays: [ + {$slice: ["$$value", "$$this.depthForTenantMigration"]}, + ["$$this"], + {$slice: [ + "$$value", + {$subtract: [ + {$add: ["$$this.depthForTenantMigration", 1]}, + {$size: "$history"}, + ]}, + ]}, + ]}, + }}}, }}, // Combine the oplog entries. {$set: {history: {$concatArrays: ["$preImageOps", "$history", "$postImageOps"]}}}, + // Fetch the complete oplog entries and unwind oplog entries in each chain to the top-level + // array. + {$lookup: { + from: {db: "local", coll: "oplog.rs"}, + localField: "history.ts", + foreignField: "ts", + // This array is expected to contain exactly one element. + as: "completeOplogEntry" + }}, // Unwind oplog entries in each chain to the top-level array. - {$unwind: "$history"}, - {$replaceRoot: {newRoot: "$history"}}, - // Sort the oplog entries. - {$sort: {ts: 1}}, + {$unwind: "$completeOplogEntry"}, + {$replaceRoot: {newRoot: "$completeOplogEntry"}}, ], readConcern: {level: "majority", afterClusterTime: startFetchingTimestamp}, cursor: {}, diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index dd796ef3a2c..e33a2c7db68 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -292,8 +292,9 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames } // If 'ns' refers to a view namespace, then we resolve its definition. - auto resolveViewDefinition = [&](const NamespaceString& ns) -> Status { - auto resolvedView = viewCatalog->resolveView(opCtx, ns); + auto resolveViewDefinition = [&](const NamespaceString& ns, + std::shared_ptr<const ViewCatalog> vcp) -> Status { + auto resolvedView = vcp->resolveView(opCtx, ns); if (!resolvedView.isOK()) { return resolvedView.getStatus().withContext( str::stream() << "Failed to resolve view '" << involvedNs.ns()); @@ -316,22 +317,40 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames }; // If the involved namespace is not in the same database as the aggregation, it must be - // from an $out or a $merge to a collection in a different database. + // from a $lookup/$graphLookup into a tenant migration donor's oplog view or from an + // $out/$merge to a collection in a different database. if (involvedNs.db() != request.getNamespace().db()) { - // SERVER-51886: It is not correct to assume that we are reading from a collection - // because the collection targeted by $out/$merge on a given database can have the same - // name as a view on the source database. As such, we determine whether the collection - // name references a view on the aggregation request's database. Note that the inverse - // scenario (mistaking a view for a collection) is not an issue because $merge/$out - // cannot target a view. - auto nssToCheck = NamespaceString(request.getNamespace().db(), involvedNs.coll()); - if (viewCatalog && viewCatalog->lookup(opCtx, nssToCheck.ns())) { - auto status = resolveViewDefinition(nssToCheck); + if (involvedNs == NamespaceString::kTenantMigrationOplogView) { + // For tenant migrations, we perform an aggregation on 'config.transactions' but + // require a lookup stage involving a view on the 'local' database. + // If the involved namespace is 'local.system.tenantMigration.oplogView', resolve + // its view definition. + auto involvedDbViewCatalog = + DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, involvedNs.db()); + + // It is safe to assume that the ViewCatalog for the `local` database always + // exists because replica sets forbid dropping the oplog and the `local` database. + invariant(involvedDbViewCatalog); + auto status = resolveViewDefinition(involvedNs, involvedDbViewCatalog); if (!status.isOK()) { return status; } } else { - resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}}; + // SERVER-51886: It is not correct to assume that we are reading from a collection + // because the collection targeted by $out/$merge on a given database can have the + // same name as a view on the source database. As such, we determine whether the + // collection name references a view on the aggregation request's database. Note + // that the inverse scenario (mistaking a view for a collection) is not an issue + // because $merge/$out cannot target a view. + auto nssToCheck = NamespaceString(request.getNamespace().db(), involvedNs.coll()); + if (viewCatalog && viewCatalog->lookup(opCtx, nssToCheck.ns())) { + auto status = resolveViewDefinition(nssToCheck, viewCatalog); + if (!status.isOK()) { + return status; + } + } else { + resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}}; + } } } else if (!viewCatalog || CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, involvedNs)) { @@ -342,7 +361,7 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames // snapshot of the view catalog. resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}}; } else if (viewCatalog->lookup(opCtx, involvedNs.ns())) { - auto status = resolveViewDefinition(involvedNs); + auto status = resolveViewDefinition(involvedNs, viewCatalog); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 11445d700a5..4d07147c24b 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -75,6 +75,9 @@ const NamespaceString NamespaceString::kTenantMigrationDonorsNamespace(Namespace const NamespaceString NamespaceString::kTenantMigrationRecipientsNamespace( NamespaceString::kConfigDb, "tenantMigrationRecipients"); +const NamespaceString NamespaceString::kTenantMigrationOplogView( + NamespaceString::kLocalDb, "system.tenantMigration.oplogView"); + const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb, "cache.collections"); const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb, diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index a60adb43d51..8a5959ba986 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -122,6 +122,9 @@ public: // Namespace for storing the persisted state of tenant migration recipient service instances. static const NamespaceString kTenantMigrationRecipientsNamespace; + // Namespace for view on local.oplog.rs for tenant migrations. + static const NamespaceString kTenantMigrationOplogView; + // Namespace for replica set configuration settings. static const NamespaceString kSystemReplSetNamespace; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 395fd372b9b..b9b10cf15a4 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -53,12 +53,14 @@ bool foreignShardedLookupAllowed() { return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); } -// Parses $graphLookup 'from' field. The 'from' field must be a string or -// {from: {db: "local", coll: "oplog.rs"}, ...}. +// Parses $graphLookup 'from' field. The 'from' field must be a string with the exception of +// 'local.system.tenantMigration.oplogView'. +// +// {from: {db: "local", coll: "system.tenantMigration.oplogView"}, ...}. NamespaceString parseGraphLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) { - // The object syntax only works for local.oplog.rs which is not a user namespace so object type - // is omitted from the error message below. + // The object syntax only works for 'local.system.tenantMigration.oplogView' which is not a user + // namespace so object type is omitted from the error message below. uassert(ErrorCodes::FailedToParse, str::stream() << "$graphLookup 'from' field must be a string, but found " << typeName(elem.type()), @@ -79,7 +81,7 @@ NamespaceString parseGraphLookupFromAndResolveNamespace(const BSONElement& elem, str::stream() << "$graphLookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: " << nss.db() << " and coll: " << nss.coll(), - nss == NamespaceString::kRsOplogNamespace); + nss == NamespaceString::kTenantMigrationOplogView); return nss; } @@ -245,13 +247,11 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { _variables.copyToExpCtx(_variablesParseState, _fromExpCtx.get()); auto pipeline = Pipeline::makePipeline(_fromPipeline, _fromExpCtx, pipelineOpts); while (auto next = pipeline->getNext()) { - // Make an exception for the oplog, since its docs are de-duplicated by the 'ts' - // field instead. uassert(40271, str::stream() << "Documents in the '" << _from.ns() << "' namespace must contain an _id for de-duplication in $graphLookup", - (_from == NamespaceString::kRsOplogNamespace) || !(*next)["_id"].missing()); + !(*next)["_id"].missing()); shouldPerformAnotherQuery = addToVisitedAndFrontier(*next, depth) || shouldPerformAnotherQuery; @@ -269,9 +269,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { } bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(Document result, long long depth) { - // The oplog does not have _id so visited oplog docs are cached by 'ts' instead. - auto id = _from == NamespaceString::kRsOplogNamespace ? result.getField("ts") - : result.getField("_id"); + auto id = result.getField("_id"); + if (_visited.find(id) != _visited.end()) { // We've already seen this object, don't repeat any work. return false; @@ -603,7 +602,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( if (argName == "from" || argName == "as" || argName == "connectFromField" || argName == "depthField" || argName == "connectToField") { // All remaining arguments to $graphLookup are expected to be strings or - // {db: "local", coll: "oplog.rs"}. local.oplog.rs is not a user namespace so object + // {db: "local", coll: "system.tenantMigration.oplogView"}. + // 'local.system.tenantMigration.oplogView' is not a user namespace so object // type is omitted from the error message below. uassert(40103, str::stream() << "expected string as argument for " << argName diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 4b438d38386..d22ee4b60bc 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -77,25 +77,26 @@ private: }; // Tests that $graphLookup with special 'from' syntax from: {db: local, coll: -// oplog.rs} can be round tripped. +// system.tenantMigration.oplogView} can be round tripped. TEST_F(DocumentSourceGraphLookUpTest, LookupReParseSerializedStageWithFromDBAndColl) { auto expCtx = getExpCtx(); - NamespaceString fromNs("local", "oplog.rs"); + NamespaceString fromNs("local", "system.tenantMigration.oplogView"); expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); - auto originalBSON = BSON("$graphLookup" << BSON("from" << BSON("db" - << "local" - << "coll" - << "oplog.rs") - << "startWith" - << "$x" - << "connectFromField" - << "id" - << "connectToField" - << "id" - << "as" - << "connections")); + auto originalBSON = + BSON("$graphLookup" << BSON("from" << BSON("db" + << "local" + << "coll" + << "system.tenantMigration.oplogView") + << "startWith" + << "$x" + << "connectFromField" + << "id" + << "connectToField" + << "id" + << "as" + << "connections")); auto graphLookupStage = DocumentSourceGraphLookUp::createFromBson(originalBSON.firstElement(), expCtx); @@ -119,8 +120,8 @@ TEST_F(DocumentSourceGraphLookUpTest, LookupReParseSerializedStageWithFromDBAndC } // $graphLookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace that isn't -// local.oplog.rs. -TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDBOrRsOplogColl) { +// local.system.tenantMigration.oplogView. +TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDBOrRsOplogView) { auto expCtx = getExpCtx(); NamespaceString fromNs("test", "coll"); expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ @@ -137,8 +138,8 @@ TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDBOrRs } // $graphLookup : {from : {db: <>, coll: <>}} syntax fails when "db" is local but "coll" is -// not "oplog.rs". -TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotRsOplogColl) { +// not "system.tenantMigration.oplogView". +TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotRsOplogView) { auto expCtx = getExpCtx(); NamespaceString fromNs("local", "coll"); expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ @@ -155,17 +156,17 @@ TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotRsOplogColl } // $lookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace when "coll" is -// "oplog.rs" but "db" is not "local". +// "system.tenantMigration.oplogView" but "db" is not "local". TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDB) { auto expCtx = getExpCtx(); - NamespaceString fromNs("test", "oplog.rs"); + NamespaceString fromNs("test", "system.tenantMigration.oplogView"); expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); ASSERT_THROWS_CODE( DocumentSourceGraphLookUp::createFromBson( fromjson("{$graphLookup: {from: {db: 'test', coll: " - "'oplog.rs'}, startWith: '$x', " + "'system.tenantMigration.oplogView'}, startWith: '$x', " "connectFromField: 'id', connectionToField: 'id', as: 'connections'}}") .firstElement(), expCtx), diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 4912cae8c60..1961941287c 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -89,12 +89,15 @@ bool foreignShardedLookupAllowed() { return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load(); } -// Parses $lookup 'from' field. The 'from' field must be a string or +// Parses $lookup 'from' field. The 'from' field must be a string or one of the following +// exceptions: // {from: {db: "config", coll: "cache.chunks.*"}, ...} or -// {from: {db: "local", coll: "oplog.rs"}, ...} . +// {from: {db: "local", coll: "oplog.rs"}, ...} or +// {from: {db: "local", coll: "tenantMigration.oplogView"}, ...} . NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) { - // The object syntax only works for cache.chunks.* and local.oplog.rs which are not user - // namespaces so object type is omitted from the error message below. + // The object syntax only works for 'cache.chunks.*', 'local.oplog.rs', and + // 'local.tenantMigration.oplogViewwhich' which are not user namespaces so object type is + // omitted from the error message below. uassert(ErrorCodes::FailedToParse, str::stream() << "$lookup 'from' field must be a string, but found " << typeName(elem.type()), @@ -111,7 +114,8 @@ NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, Stri ErrorCodes::FailedToParse, str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: " << nss.db() << " and coll: " << nss.coll(), - nss.isConfigDotCacheDotChunks() || nss == NamespaceString::kRsOplogNamespace); + nss.isConfigDotCacheDotChunks() || nss == NamespaceString::kRsOplogNamespace || + nss == NamespaceString::kTenantMigrationOplogView); return nss; } diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 3c6e4a5878d..a66f88b318f 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -94,7 +94,7 @@ void storeExternalClusterTimeKeyDocsAndRefreshCache( void createRetryableWritesView(OperationContext* opCtx, Database* db) { writeConflictRetry( - opCtx, "createDonorOplogView", "local.system.tenantMigration.oplogView", [&] { + opCtx, "createDonorOplogView", NamespaceString::kTenantMigrationOplogView.ns(), [&] { { // Create 'system.views' in a separate WUOW if it does not exist. WriteUnitOfWork wuow(opCtx); @@ -121,8 +121,8 @@ void createRetryableWritesView(OperationContext* opCtx, Database* db) { << "preImageOpTime" << 1 << "postImageOpTime" << 1))); WriteUnitOfWork wuow(opCtx); - uassertStatusOK(db->createView( - opCtx, NamespaceString("local.system.tenantMigration.oplogView"), options)); + uassertStatusOK( + db->createView(opCtx, NamespaceString::kTenantMigrationOplogView, options)); wuow.commit(); }); } |