summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_collection_cloner.cpp
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-12-18 12:41:00 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-18 20:10:54 +0000
commit5490cd2c22b132370f5f91443f11e00980ad0b55 (patch)
tree83e5640a0e41b0d4a7605b35409089f36dd2f3a7 /src/mongo/db/repl/tenant_collection_cloner.cpp
parent36d6ac0eb408bf71d97db41c81e6575336088654 (diff)
downloadmongo-5490cd2c22b132370f5f91443f11e00980ad0b55.tar.gz
SERVER-52717: Handle resuming after failover in TenantCollectionCloner
Diffstat (limited to 'src/mongo/db/repl/tenant_collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.cpp65
1 files changed, 57 insertions, 8 deletions
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp
index 92e7ef7cca3..611bbebe1b5 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands/list_collections_filter.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/repl/cloner_utils.h"
#include "mongo/db/repl/database_cloner_gen.h"
@@ -106,6 +107,7 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss,
_dbWorkTaskRunner(dbPool),
_tenantId(tenantId) {
invariant(sourceNss.isValid());
+ invariant(ClonerUtils::isNamespaceForTenant(sourceNss, tenantId));
invariant(collectionOptions.uuid);
_sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid);
_stats.ns = _sourceNss.ns();
@@ -251,21 +253,65 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::listIndexesStage() {
BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() {
auto opCtx = cc().makeOperationContext();
+ bool skipCreateIndexes = false;
+
+ // TODO SERVER-53425: Handle cases when the collection has been renamed on the donor.
auto status =
getStorageInterface()->createCollection(opCtx.get(), _sourceNss, _collectionOptions);
if (status == ErrorCodes::NamespaceExists) {
- uassert(4884501,
- "Collection exists but does not belong to tenant",
- ClonerUtils::isNamespaceForTenant(_sourceNss, _tenantId));
+ uassert(ErrorCodes::NamespaceExists,
+ str::stream() << "Tenant '" << _tenantId << "': collection '" << _sourceNss
+ << "' already exists prior to data sync",
+ getSharedData()->isResuming());
+
+ // We are resuming and the collection already exists.
+ DBDirectClient client(opCtx.get());
+
+ auto fieldsToReturn = BSON("_id" << 1);
+ _lastDocId =
+ client.findOne(_sourceNss.ns(), Query().sort(BSON("_id" << -1)), &fieldsToReturn);
+ if (!_lastDocId.isEmpty()) {
+ // The collection is not empty. Skip creating indexes and resume cloning from the last
+ // document.
+ skipCreateIndexes = true;
+ _readyIndexSpecs.clear();
+ auto count = client.count(_sourceDbAndUuid);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.documentsCopied += count;
+ _progressMeter.hit(count);
+ }
+ } else {
+ // The collection is still empty. Create indexes that we haven't created. For the
+ // indexes that exist locally but not on the donor, we don't need to drop them because
+ // oplog application will eventually apply those dropIndex oplog entries.
+ const bool includeBuildUUIDs = false;
+ const int options = 0;
+ auto existingIndexSpecs =
+ client.getIndexSpecs(_sourceDbAndUuid, includeBuildUUIDs, options);
+ StringMap<bool> existingIndexNames;
+ for (const auto& spec : existingIndexSpecs) {
+ existingIndexNames[spec.getStringField("name")] = true;
+ }
+ for (auto it = _readyIndexSpecs.begin(); it != _readyIndexSpecs.end();) {
+ if (existingIndexNames[it->getStringField("name")]) {
+ it = _readyIndexSpecs.erase(it);
+ } else {
+ it++;
+ }
+ }
+ }
} else {
uassertStatusOKWithContext(status, "Tenant collection cloner: create collection");
}
- // This will start building the indexes whose specs we saved last stage.
- status = getStorageInterface()->createIndexesOnEmptyCollection(
- opCtx.get(), _sourceNss, _readyIndexSpecs);
+ if (!skipCreateIndexes) {
+ // This will start building the indexes whose specs we saved last stage.
+ status = getStorageInterface()->createIndexesOnEmptyCollection(
+ opCtx.get(), _sourceNss, _readyIndexSpecs);
- uassertStatusOKWithContext(status, "Tenant collection cloner: create indexes");
+ uassertStatusOKWithContext(status, "Tenant collection cloner: create indexes");
+ }
return kContinueNormally;
}
@@ -296,7 +342,10 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() {
}
void TenantCollectionCloner::runQuery() {
- auto query = QUERY("query" << BSONObj());
+ auto query = _lastDocId.isEmpty()
+ ? QUERY("query" << BSONObj())
+ // Use $expr and the aggregation version of $gt to avoid type bracketing.
+ : QUERY("$expr" << BSON("$gt" << BSON_ARRAY("$_id" << _lastDocId["_id"])));
query.hint(BSON("_id" << 1));
getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },