summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/tenant_migration_retryable_write_retry.js53
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp47
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp24
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp43
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp14
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp6
8 files changed, 124 insertions, 69 deletions
diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js
index 2e1aae90924..1bb104c08be 100644
--- a/jstests/replsets/tenant_migration_retryable_write_retry.js
+++ b/jstests/replsets/tenant_migration_retryable_write_retry.js
@@ -224,16 +224,15 @@ assert.commandWorked(donorPrimary.getDB(kDbName).runCommand({
// The aggregation pipeline will return an array of oplog entries (pre-image/post-image oplog
// entries included) for retryable writes that committed before 'startFetchingTimestamp' sorted
-// in ascending order of "ts". The pipeline doesn't currently support retryable write oplog chains
-// that exceed 100 MB since pipeline stages have a memory limit of 100 MB.
+// in ascending order of "ts".
const aggRes = donorPrimary.getDB("config").runCommand({
aggregate: "transactions",
pipeline: [
// Fetch the config.transactions entries.
{$match: {"lastWriteOpTime.ts": {$lt: startFetchingTimestamp}}},
- // Fetch latest oplog entry for each config.transactions entry.
+ // Fetch latest oplog entry for each config.transactions entry from the oplog view.
{$lookup: {
- from: {db: "local", coll: "oplog.rs"},
+ from: {db: "local", coll: "system.tenantMigration.oplogView"},
localField: "lastWriteOpTime.ts",
foreignField: "ts",
// This array is expected to contain exactly one element.
@@ -242,39 +241,65 @@ const aggRes = donorPrimary.getDB("config").runCommand({
// Replace the single-element 'lastOps' array field with a single 'lastOp' field.
{$addFields: {lastOp: {$first: "$lastOps"}}},
{$unset: "lastOps"},
- // Fetch preImage oplog entry for findAndModify.
+ // Fetch preImage oplog entry for findAndModify from the oplog view.
{$lookup: {
- from: {db: "local", coll: "oplog.rs"},
+ from: {db: "local", coll: "system.tenantMigration.oplogView"},
localField: "lastOp.preImageOpTime.ts",
foreignField: "ts",
// This array is expected to contain exactly one element if the 'preImageOpTime'
// field is not null.
as: "preImageOps"
}},
- // Fetch postImage oplog entry for findAndModify.
+ // Fetch postImage oplog entry for findAndModify from the oplog view.
{$lookup: {
- from: {db: "local", coll: "oplog.rs"},
+ from: {db: "local", coll: "system.tenantMigration.oplogView"},
localField: "lastOp.postImageOpTime.ts",
foreignField: "ts",
// This array is expected to contain exactly one element if the 'postImageOpTime'
// field is not null.
as: "postImageOps"
}},
- // Fetch oplog entries in each chain for insert, update, or delete.
+ // Fetch oplog entries in each chain for insert, update, or delete from the oplog view.
{$graphLookup: {
- from: {db: "local", coll: "oplog.rs"},
+ from: {db: "local", coll: "system.tenantMigration.oplogView"},
startWith: "$lastOp.ts",
connectFromField: "prevOpTime.ts",
connectToField: "ts",
as: "history",
+ depthField: "depthForTenantMigration",
+ }},
+ // Sort the oplog entries in each oplog chain.
+ {$set: {
+ history: {$reverseArray: {$reduce: {
+ input: "$history",
+ initialValue: {$range: [0, {$size: "$history"}]},
+ in: {$concatArrays: [
+ {$slice: ["$$value", "$$this.depthForTenantMigration"]},
+ ["$$this"],
+ {$slice: [
+ "$$value",
+ {$subtract: [
+ {$add: ["$$this.depthForTenantMigration", 1]},
+ {$size: "$history"},
+ ]},
+ ]},
+ ]},
+ }}},
}},
// Combine the oplog entries.
{$set: {history: {$concatArrays: ["$preImageOps", "$history", "$postImageOps"]}}},
+ // Fetch the complete oplog entries and unwind oplog entries in each chain to the top-level
+ // array.
+ {$lookup: {
+ from: {db: "local", coll: "oplog.rs"},
+ localField: "history.ts",
+ foreignField: "ts",
+ // This array is expected to contain exactly one element.
+ as: "completeOplogEntry"
+ }},
// Unwind oplog entries in each chain to the top-level array.
- {$unwind: "$history"},
- {$replaceRoot: {newRoot: "$history"}},
- // Sort the oplog entries.
- {$sort: {ts: 1}},
+ {$unwind: "$completeOplogEntry"},
+ {$replaceRoot: {newRoot: "$completeOplogEntry"}},
],
readConcern: {level: "majority", afterClusterTime: startFetchingTimestamp},
cursor: {},
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index dd796ef3a2c..e33a2c7db68 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -292,8 +292,9 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
}
// If 'ns' refers to a view namespace, then we resolve its definition.
- auto resolveViewDefinition = [&](const NamespaceString& ns) -> Status {
- auto resolvedView = viewCatalog->resolveView(opCtx, ns);
+ auto resolveViewDefinition = [&](const NamespaceString& ns,
+ std::shared_ptr<const ViewCatalog> vcp) -> Status {
+ auto resolvedView = vcp->resolveView(opCtx, ns);
if (!resolvedView.isOK()) {
return resolvedView.getStatus().withContext(
str::stream() << "Failed to resolve view '" << involvedNs.ns());
@@ -316,22 +317,40 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
};
// If the involved namespace is not in the same database as the aggregation, it must be
- // from an $out or a $merge to a collection in a different database.
+ // from a $lookup/$graphLookup into a tenant migration donor's oplog view or from an
+ // $out/$merge to a collection in a different database.
if (involvedNs.db() != request.getNamespace().db()) {
- // SERVER-51886: It is not correct to assume that we are reading from a collection
- // because the collection targeted by $out/$merge on a given database can have the same
- // name as a view on the source database. As such, we determine whether the collection
- // name references a view on the aggregation request's database. Note that the inverse
- // scenario (mistaking a view for a collection) is not an issue because $merge/$out
- // cannot target a view.
- auto nssToCheck = NamespaceString(request.getNamespace().db(), involvedNs.coll());
- if (viewCatalog && viewCatalog->lookup(opCtx, nssToCheck.ns())) {
- auto status = resolveViewDefinition(nssToCheck);
+ if (involvedNs == NamespaceString::kTenantMigrationOplogView) {
+ // For tenant migrations, we perform an aggregation on 'config.transactions' but
+ // require a lookup stage involving a view on the 'local' database.
+ // If the involved namespace is 'local.system.tenantMigration.oplogView', resolve
+ // its view definition.
+ auto involvedDbViewCatalog =
+ DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, involvedNs.db());
+
+ // It is safe to assume that the ViewCatalog for the `local` database always
+ // exists because replica sets forbid dropping the oplog and the `local` database.
+ invariant(involvedDbViewCatalog);
+ auto status = resolveViewDefinition(involvedNs, involvedDbViewCatalog);
if (!status.isOK()) {
return status;
}
} else {
- resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}};
+ // SERVER-51886: It is not correct to assume that we are reading from a collection
+ // because the collection targeted by $out/$merge on a given database can have the
+ // same name as a view on the source database. As such, we determine whether the
+ // collection name references a view on the aggregation request's database. Note
+ // that the inverse scenario (mistaking a view for a collection) is not an issue
+ // because $merge/$out cannot target a view.
+ auto nssToCheck = NamespaceString(request.getNamespace().db(), involvedNs.coll());
+ if (viewCatalog && viewCatalog->lookup(opCtx, nssToCheck.ns())) {
+ auto status = resolveViewDefinition(nssToCheck, viewCatalog);
+ if (!status.isOK()) {
+ return status;
+ }
+ } else {
+ resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}};
+ }
}
} else if (!viewCatalog ||
CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, involvedNs)) {
@@ -342,7 +361,7 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
// snapshot of the view catalog.
resolvedNamespaces[involvedNs.coll()] = {involvedNs, std::vector<BSONObj>{}};
} else if (viewCatalog->lookup(opCtx, involvedNs.ns())) {
- auto status = resolveViewDefinition(involvedNs);
+ auto status = resolveViewDefinition(involvedNs, viewCatalog);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 11445d700a5..4d07147c24b 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -75,6 +75,9 @@ const NamespaceString NamespaceString::kTenantMigrationDonorsNamespace(Namespace
const NamespaceString NamespaceString::kTenantMigrationRecipientsNamespace(
NamespaceString::kConfigDb, "tenantMigrationRecipients");
+const NamespaceString NamespaceString::kTenantMigrationOplogView(
+ NamespaceString::kLocalDb, "system.tenantMigration.oplogView");
+
const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb,
"cache.collections");
const NamespaceString NamespaceString::kShardConfigDatabasesNamespace(NamespaceString::kConfigDb,
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index a60adb43d51..8a5959ba986 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -122,6 +122,9 @@ public:
// Namespace for storing the persisted state of tenant migration recipient service instances.
static const NamespaceString kTenantMigrationRecipientsNamespace;
+ // Namespace for view on local.oplog.rs for tenant migrations.
+ static const NamespaceString kTenantMigrationOplogView;
+
// Namespace for replica set configuration settings.
static const NamespaceString kSystemReplSetNamespace;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 395fd372b9b..b9b10cf15a4 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -53,12 +53,14 @@ bool foreignShardedLookupAllowed() {
return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
}
-// Parses $graphLookup 'from' field. The 'from' field must be a string or
-// {from: {db: "local", coll: "oplog.rs"}, ...}.
+// Parses $graphLookup 'from' field. The 'from' field must be a string with the exception of
+// 'local.system.tenantMigration.oplogView'.
+//
+// {from: {db: "local", coll: "system.tenantMigration.oplogView"}, ...}.
NamespaceString parseGraphLookupFromAndResolveNamespace(const BSONElement& elem,
StringData defaultDb) {
- // The object syntax only works for local.oplog.rs which is not a user namespace so object type
- // is omitted from the error message below.
+ // The object syntax only works for 'local.system.tenantMigration.oplogView' which is not a user
+ // namespace so object type is omitted from the error message below.
uassert(ErrorCodes::FailedToParse,
str::stream() << "$graphLookup 'from' field must be a string, but found "
<< typeName(elem.type()),
@@ -79,7 +81,7 @@ NamespaceString parseGraphLookupFromAndResolveNamespace(const BSONElement& elem,
str::stream()
<< "$graphLookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: "
<< nss.db() << " and coll: " << nss.coll(),
- nss == NamespaceString::kRsOplogNamespace);
+ nss == NamespaceString::kTenantMigrationOplogView);
return nss;
}
@@ -245,13 +247,11 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
_variables.copyToExpCtx(_variablesParseState, _fromExpCtx.get());
auto pipeline = Pipeline::makePipeline(_fromPipeline, _fromExpCtx, pipelineOpts);
while (auto next = pipeline->getNext()) {
- // Make an exception for the oplog, since its docs are de-duplicated by the 'ts'
- // field instead.
uassert(40271,
str::stream()
<< "Documents in the '" << _from.ns()
<< "' namespace must contain an _id for de-duplication in $graphLookup",
- (_from == NamespaceString::kRsOplogNamespace) || !(*next)["_id"].missing());
+ !(*next)["_id"].missing());
shouldPerformAnotherQuery =
addToVisitedAndFrontier(*next, depth) || shouldPerformAnotherQuery;
@@ -269,9 +269,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
}
bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(Document result, long long depth) {
- // The oplog does not have _id so visited oplog docs are cached by 'ts' instead.
- auto id = _from == NamespaceString::kRsOplogNamespace ? result.getField("ts")
- : result.getField("_id");
+ auto id = result.getField("_id");
+
if (_visited.find(id) != _visited.end()) {
// We've already seen this object, don't repeat any work.
return false;
@@ -603,7 +602,8 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
if (argName == "from" || argName == "as" || argName == "connectFromField" ||
argName == "depthField" || argName == "connectToField") {
// All remaining arguments to $graphLookup are expected to be strings or
- // {db: "local", coll: "oplog.rs"}. local.oplog.rs is not a user namespace so object
+ // {db: "local", coll: "system.tenantMigration.oplogView"}.
+ // 'local.system.tenantMigration.oplogView' is not a user namespace so object
// type is omitted from the error message below.
uassert(40103,
str::stream() << "expected string as argument for " << argName
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 4b438d38386..d22ee4b60bc 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -77,25 +77,26 @@ private:
};
// Tests that $graphLookup with special 'from' syntax from: {db: local, coll:
-// oplog.rs} can be round tripped.
+// system.tenantMigration.oplogView} can be round tripped.
TEST_F(DocumentSourceGraphLookUpTest, LookupReParseSerializedStageWithFromDBAndColl) {
auto expCtx = getExpCtx();
- NamespaceString fromNs("local", "oplog.rs");
+ NamespaceString fromNs("local", "system.tenantMigration.oplogView");
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
- auto originalBSON = BSON("$graphLookup" << BSON("from" << BSON("db"
- << "local"
- << "coll"
- << "oplog.rs")
- << "startWith"
- << "$x"
- << "connectFromField"
- << "id"
- << "connectToField"
- << "id"
- << "as"
- << "connections"));
+ auto originalBSON =
+ BSON("$graphLookup" << BSON("from" << BSON("db"
+ << "local"
+ << "coll"
+ << "system.tenantMigration.oplogView")
+ << "startWith"
+ << "$x"
+ << "connectFromField"
+ << "id"
+ << "connectToField"
+ << "id"
+ << "as"
+ << "connections"));
auto graphLookupStage =
DocumentSourceGraphLookUp::createFromBson(originalBSON.firstElement(), expCtx);
@@ -119,8 +120,8 @@ TEST_F(DocumentSourceGraphLookUpTest, LookupReParseSerializedStageWithFromDBAndC
}
// $graphLookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace that isn't
-// local.oplog.rs.
-TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDBOrRsOplogColl) {
+// local.system.tenantMigration.oplogView.
+TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDBOrRsOplogView) {
auto expCtx = getExpCtx();
NamespaceString fromNs("test", "coll");
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
@@ -137,8 +138,8 @@ TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDBOrRs
}
// $graphLookup : {from : {db: <>, coll: <>}} syntax fails when "db" is local but "coll" is
-// not "oplog.rs".
-TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotRsOplogColl) {
+// not "system.tenantMigration.oplogView".
+TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotRsOplogView) {
auto expCtx = getExpCtx();
NamespaceString fromNs("local", "coll");
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
@@ -155,17 +156,17 @@ TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotRsOplogColl
}
// $lookup : {from : {db: <>, coll: <>}} syntax doesn't work for a namespace when "coll" is
-// "oplog.rs" but "db" is not "local".
+// "system.tenantMigration.oplogView" but "db" is not "local".
TEST_F(DocumentSourceGraphLookUpTest, RejectsPipelineFromDBAndCollNotLocalDB) {
auto expCtx = getExpCtx();
- NamespaceString fromNs("test", "oplog.rs");
+ NamespaceString fromNs("test", "system.tenantMigration.oplogView");
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
ASSERT_THROWS_CODE(
DocumentSourceGraphLookUp::createFromBson(
fromjson("{$graphLookup: {from: {db: 'test', coll: "
- "'oplog.rs'}, startWith: '$x', "
+ "'system.tenantMigration.oplogView'}, startWith: '$x', "
"connectFromField: 'id', connectionToField: 'id', as: 'connections'}}")
.firstElement(),
expCtx),
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 4912cae8c60..1961941287c 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -89,12 +89,15 @@ bool foreignShardedLookupAllowed() {
return getTestCommandsEnabled() && internalQueryAllowShardedLookup.load();
}
-// Parses $lookup 'from' field. The 'from' field must be a string or
+// Parses $lookup 'from' field. The 'from' field must be a string or one of the following
+// exceptions:
// {from: {db: "config", coll: "cache.chunks.*"}, ...} or
-// {from: {db: "local", coll: "oplog.rs"}, ...} .
+// {from: {db: "local", coll: "oplog.rs"}, ...} or
+// {from: {db: "local", coll: "tenantMigration.oplogView"}, ...} .
NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, StringData defaultDb) {
- // The object syntax only works for cache.chunks.* and local.oplog.rs which are not user
- // namespaces so object type is omitted from the error message below.
+ // The object syntax only works for 'cache.chunks.*', 'local.oplog.rs', and
+ // 'local.tenantMigration.oplogViewwhich' which are not user namespaces so object type is
+ // omitted from the error message below.
uassert(ErrorCodes::FailedToParse,
str::stream() << "$lookup 'from' field must be a string, but found "
<< typeName(elem.type()),
@@ -111,7 +114,8 @@ NamespaceString parseLookupFromAndResolveNamespace(const BSONElement& elem, Stri
ErrorCodes::FailedToParse,
str::stream() << "$lookup with syntax {from: {db:<>, coll:<>},..} is not supported for db: "
<< nss.db() << " and coll: " << nss.coll(),
- nss.isConfigDotCacheDotChunks() || nss == NamespaceString::kRsOplogNamespace);
+ nss.isConfigDotCacheDotChunks() || nss == NamespaceString::kRsOplogNamespace ||
+ nss == NamespaceString::kTenantMigrationOplogView);
return nss;
}
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index 3c6e4a5878d..a66f88b318f 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -94,7 +94,7 @@ void storeExternalClusterTimeKeyDocsAndRefreshCache(
void createRetryableWritesView(OperationContext* opCtx, Database* db) {
writeConflictRetry(
- opCtx, "createDonorOplogView", "local.system.tenantMigration.oplogView", [&] {
+ opCtx, "createDonorOplogView", NamespaceString::kTenantMigrationOplogView.ns(), [&] {
{
// Create 'system.views' in a separate WUOW if it does not exist.
WriteUnitOfWork wuow(opCtx);
@@ -121,8 +121,8 @@ void createRetryableWritesView(OperationContext* opCtx, Database* db) {
<< "preImageOpTime" << 1 << "postImageOpTime" << 1)));
WriteUnitOfWork wuow(opCtx);
- uassertStatusOK(db->createView(
- opCtx, NamespaceString("local.system.tenantMigration.oplogView"), options));
+ uassertStatusOK(
+ db->createView(opCtx, NamespaceString::kTenantMigrationOplogView, options));
wuow.commit();
});
}