diff options
-rw-r--r-- | jstests/replsets/initial_sync_applier_error.js | 1 | ||||
-rw-r--r-- | jstests/replsets/initial_sync_rename_collection.js | 3 | ||||
-rw-r--r-- | jstests/replsets/initial_sync_rename_collection_unsafe.js | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 213 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 14 |
7 files changed, 247 insertions, 67 deletions
diff --git a/jstests/replsets/initial_sync_applier_error.js b/jstests/replsets/initial_sync_applier_error.js index 9816e92fd74..7a280242024 100644 --- a/jstests/replsets/initial_sync_applier_error.js +++ b/jstests/replsets/initial_sync_applier_error.js @@ -44,7 +44,6 @@ assert.commandWorked(secondary.getDB('admin').runCommand( {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'})); - checkLog.contains(secondary, 'Applying renameCollection not supported'); checkLog.contains(secondary, 'initial sync done'); replSet.awaitReplication(); diff --git a/jstests/replsets/initial_sync_rename_collection.js b/jstests/replsets/initial_sync_rename_collection.js index b8736b5274d..fdb77ea6da4 100644 --- a/jstests/replsets/initial_sync_rename_collection.js +++ b/jstests/replsets/initial_sync_rename_collection.js @@ -26,8 +26,7 @@ rst.awaitReplication(); jsTestLog('Bring up a new node'); - // TODO(SERVER-4941): Only a single initial sync attempt should be necessary. - const secondary = rst.add({setParameter: 'numInitialSyncAttempts=3'}); + const secondary = rst.add({setParameter: 'numInitialSyncAttempts=1'}); rst.reInitiate(); assert.eq(primary, rst.getPrimary(), 'Primary changed after reconfig'); diff --git a/jstests/replsets/initial_sync_rename_collection_unsafe.js b/jstests/replsets/initial_sync_rename_collection_unsafe.js index a08143ba735..13e58b1503a 100644 --- a/jstests/replsets/initial_sync_rename_collection_unsafe.js +++ b/jstests/replsets/initial_sync_rename_collection_unsafe.js @@ -17,6 +17,7 @@ const dbName = 'd'; const primary = rst.getPrimary(); const primaryDB = primary.getDB(dbName); + assert.commandWorked(primary.adminCommand({setFeatureCompatibilityVersion:"3.4"})); assert.writeOK(primaryDB['foo'].save({})); diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 385dd2d3dfa..66edd0958e6 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -78,6 +78,17 @@ MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone); // 'AsyncResultsMerger' for a specific collection. MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse); +BSONObj makeCommandWithUUIDorCollectionName(StringData command, + OptionalCollectionUUID uuid, + const NamespaceString& nss) { + BSONObjBuilder builder; + if (uuid) + uuid->appendToBuilder(&builder, command); + else + builder.append(command, nss.coll()); + return builder.obj(); +} + CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, OldThreadPool* dbWorkThreadPool, const HostAndPort& source, @@ -96,33 +107,35 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _onCompletion(onCompletion), _storageInterface(storageInterface), _countScheduler(_executor, - RemoteCommandRequest(_source, - _sourceNss.db().toString(), - BSON("count" << _sourceNss.coll()), - ReadPreferenceSetting::secondaryPreferredMetadata(), - nullptr, - RemoteCommandRequest::kNoTimeout), + RemoteCommandRequest( + _source, + _sourceNss.db().toString(), + makeCommandWithUUIDorCollectionName("count", _options.uuid, sourceNss), + ReadPreferenceSetting::secondaryPreferredMetadata(), + nullptr, + RemoteCommandRequest::kNoTimeout), stdx::bind(&CollectionCloner::_countCallback, this, stdx::placeholders::_1), RemoteCommandRetryScheduler::makeRetryPolicy( numInitialSyncCollectionCountAttempts.load(), executor::RemoteCommandRequest::kNoTimeout, RemoteCommandRetryScheduler::kAllRetriableErrors)), - _listIndexesFetcher(_executor, - _source, - _sourceNss.db().toString(), - BSON("listIndexes" << _sourceNss.coll()), - stdx::bind(&CollectionCloner::_listIndexesCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3), - ReadPreferenceSetting::secondaryPreferredMetadata(), - RemoteCommandRequest::kNoTimeout /* find network timeout */, - RemoteCommandRequest::kNoTimeout /* getMore network timeout */, - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncListIndexesAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)), + _listIndexesFetcher( + _executor, + _source, + _sourceNss.db().toString(), + makeCommandWithUUIDorCollectionName("listIndexes", _options.uuid, sourceNss), + stdx::bind(&CollectionCloner::_listIndexesCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3), + ReadPreferenceSetting::secondaryPreferredMetadata(), + RemoteCommandRequest::kNoTimeout /* find network timeout */, + RemoteCommandRequest::kNoTimeout /* getMore network timeout */, + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncListIndexesAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)), _indexSpecs(), _documentsToInsert(), _dbWorkTaskRunner(_dbWorkThreadPool), @@ -375,14 +388,22 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& } UniqueLock lk(_mutex); + // When listing indexes by UUID, the sync source may use a different name for the collection + // as result of renaming or two-phase drop. As the index spec also includes a 'ns' field, this + // must be rewritten. + BSONObjBuilder nsFieldReplacementBuilder; + nsFieldReplacementBuilder.append("ns", _sourceNss.ns()); + BSONElement nsFieldReplacementElem = nsFieldReplacementBuilder.done().firstElement(); + // We may be called with multiple batches leading to a need to grow _indexSpecs. _indexSpecs.reserve(_indexSpecs.size() + documents.size()); for (auto&& doc : documents) { + // The addField replaces the 'ns' field with the correct name, see above. if (StringData("_id_") == doc["name"].str()) { - _idIndexSpec = doc; + _idIndexSpec = doc.addField(nsFieldReplacementElem); continue; } - _indexSpecs.push_back(doc); + _indexSpecs.push_back(doc.addField(nsFieldReplacementElem)); } lk.unlock(); @@ -435,14 +456,16 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca // the correctness of the collection cloning process until 'parallelCollectionScan' // can be tested more extensively in context of initial sync. if (_maxNumClonerCursors == 1) { - cmdObj.append("find", _sourceNss.coll()); + cmdObj.appendElements( + makeCommandWithUUIDorCollectionName("find", _options.uuid, _sourceNss)); cmdObj.append("noCursorTimeout", true); // Set batchSize to be 0 to establish the cursor without fetching any documents, // similar to the response format of 'parallelCollectionScan'. cmdObj.append("batchSize", 0); cursorCommand = Find; } else { - cmdObj.append("parallelCollectionScan", _sourceNss.coll()); + cmdObj.appendElements(makeCommandWithUUIDorCollectionName( + "parallelCollectionScan", _options.uuid, _sourceNss)); cmdObj.append("numCursors", _maxNumClonerCursors); cursorCommand = ParallelCollScan; } diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 8812045e858..dfe09f3ee2f 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -65,6 +65,7 @@ public: protected: void setUp() override; void tearDown() override; + std::vector<BSONObj> makeSecondaryIndexSpecs(const NamespaceString& nss); // A simple arbitrary value to use as the default batch size. const int defaultBatchSize = 1024; @@ -98,9 +99,9 @@ void CollectionClonerTest::setUp() { [this](const NamespaceString& nss, const CollectionOptions& options, const BSONObj idIndexSpec, - const std::vector<BSONObj>& secondaryIndexSpecs) { + const std::vector<BSONObj>& nonIdIndexSpecs) { (_loader = new CollectionBulkLoaderMock(&collectionStats)) - ->init(secondaryIndexSpecs) + ->init(nonIdIndexSpecs) .transitional_ignore(); return StatusWith<std::unique_ptr<CollectionBulkLoader>>( @@ -108,6 +109,18 @@ void CollectionClonerTest::setUp() { }; } +// Return index specs to use for secondary indexes. +std::vector<BSONObj> CollectionClonerTest::makeSecondaryIndexSpecs(const NamespaceString& nss) { + return {BSON("v" << 1 << "key" << BSON("a" << 1) << "name" + << "a_1" + << "ns" + << nss.ns()), + BSON("v" << 1 << "key" << BSON("b" << 1) << "name" + << "b_1" + << "ns" + << nss.ns())}; +} + void CollectionClonerTest::tearDown() { BaseClonerTest::tearDown(); // Executor may still invoke collection cloner's callback before shutting down. @@ -640,14 +653,7 @@ TEST_F(CollectionClonerTest, BeginCollection) { // Split listIndexes response into 2 batches: first batch contains idIndexSpec and // second batch contains specs - std::vector<BSONObj> specs{BSON("v" << 1 << "key" << BSON("a" << 1) << "name" - << "a_1" - << "ns" - << nss.ns()), - BSON("v" << 1 << "key" << BSON("b" << 1) << "name" - << "b_1" - << "ns" - << nss.ns())}; + auto nonIdIndexSpecs = makeSecondaryIndexSpecs(nss); // First batch contains the _id_ index spec. { @@ -663,8 +669,8 @@ TEST_F(CollectionClonerTest, BeginCollection) { // Second batch contains the other index specs. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse( - createListIndexesResponse(0, BSON_ARRAY(specs[0] << specs[1]), "nextBatch")); + processNetworkResponse(createListIndexesResponse( + 0, BSON_ARRAY(nonIdIndexSpecs[0] << nonIdIndexSpecs[1]), "nextBatch")); } collectionCloner->waitForDbWorker(); @@ -674,9 +680,9 @@ TEST_F(CollectionClonerTest, BeginCollection) { ASSERT_EQUALS(nss.ns(), collNss.ns()); ASSERT_BSONOBJ_EQ(options.toBSON(), collOptions.toBSON()); - ASSERT_EQUALS(specs.size(), collIndexSpecs.size()); - for (std::vector<BSONObj>::size_type i = 0; i < specs.size(); ++i) { - ASSERT_BSONOBJ_EQ(specs[i], collIndexSpecs[i]); + ASSERT_EQUALS(nonIdIndexSpecs.size(), collIndexSpecs.size()); + for (std::vector<BSONObj>::size_type i = 0; i < nonIdIndexSpecs.size(); ++i) { + ASSERT_BSONOBJ_EQ(nonIdIndexSpecs[i], collIndexSpecs[i]); } // Cloner is still active because it has to read the documents from the source collection. @@ -1395,6 +1401,171 @@ TEST_F(CollectionClonerTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); } +class CollectionClonerUUIDTest : public CollectionClonerTest { +protected: + // The UUID tests should deal gracefully with renamed collections, so start the cloner with + // an alternate name. + const NamespaceString alternateNss{"db", "alternateCollName"}; + void startupWithUUID(int maxNumCloningCursors = 1) { + collectionCloner.reset(); + options.uuid = UUID::gen(); + collectionCloner = stdx::make_unique<CollectionCloner>( + &getExecutor(), + dbWorkThreadPool.get(), + target, + alternateNss, + options, + stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), + storageInterface.get(), + defaultBatchSize, + maxNumCloningCursors); + + ASSERT_OK(collectionCloner->startup()); + } + + void testWithMaxNumCloningCursors(int maxNumCloningCursors, StringData cmdName) { + startupWithUUID(maxNumCloningCursors); + + CollectionOptions actualOptions; + CollectionMockStats stats; + CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats); + bool collectionCreated = false; + storageInterface->createCollectionForBulkFn = [&](const NamespaceString& theNss, + const CollectionOptions& theOptions, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& theIndexSpecs) + -> StatusWith<std::unique_ptr<CollectionBulkLoader>> { + collectionCreated = true; + actualOptions = theOptions; + return std::unique_ptr<CollectionBulkLoader>(loader); + }; + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); + } + + collectionCloner->waitForDbWorker(); + ASSERT_TRUE(collectionCreated); + + // Fetcher should be scheduled after cloner creates collection. + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(net->hasReadyRequests()); + NetworkOperationIterator noi = net->getNextReadyRequest(); + ASSERT_FALSE(net->hasReadyRequests()); + auto&& noiRequest = noi->getRequest(); + ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); + ASSERT_BSONOBJ_EQ(actualOptions.toBSON(), options.toBSON()); + + ASSERT_EQUALS(cmdName, std::string(noiRequest.cmdObj.firstElementFieldName())); + ASSERT_EQUALS(cmdName == "find", noiRequest.cmdObj.getField("noCursorTimeout").trueValue()); + auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement())); + ASSERT_EQUALS(options.uuid.get(), requestUUID); + } +}; + +TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) { + startupWithUUID(); + + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + ASSERT_TRUE(net->hasReadyRequests()); + NetworkOperationIterator noi = net->getNextReadyRequest(); + auto&& noiRequest = noi->getRequest(); + ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); + ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName())); + auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement())); + ASSERT_EQUALS(options.uuid.get(), requestUUID); + + ASSERT_FALSE(net->hasReadyRequests()); + ASSERT_TRUE(collectionCloner->isActive()); +} + +TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { + startupWithUUID(); + + CollectionMockStats stats; + CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats); + NamespaceString collNss; + CollectionOptions collOptions; + BSONObj collIdIndexSpec; + std::vector<BSONObj> collSecondaryIndexSpecs; + storageInterface->createCollectionForBulkFn = [&](const NamespaceString& theNss, + const CollectionOptions& theOptions, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& nonIdIndexSpecs) + -> StatusWith<std::unique_ptr<CollectionBulkLoader>> { + collNss = theNss; + collOptions = theOptions; + collIdIndexSpec = idIndexSpec; + collSecondaryIndexSpecs = nonIdIndexSpecs; + return std::unique_ptr<CollectionBulkLoader>(loader); + }; + + // Split listIndexes response into 2 batches: first batch contains idIndexSpec and + // second batch contains specs. We expect the collection cloner to fix up the collection names + // (here from 'nss' to 'alternateNss') in the index specs, as the collection with the given UUID + // may be known with a different name by the sync source due to a rename or two-phase drop. + auto nonIdIndexSpecsToReturnBySyncSource = makeSecondaryIndexSpecs(nss); + + // First batch contains the _id_ index spec. + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(idIndexSpec))); + } + + // 'status' should not be modified because cloning is not finished. + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + ASSERT_TRUE(collectionCloner->isActive()); + + // Second batch contains the other index specs. + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse( + createListIndexesResponse(0, + BSON_ARRAY(nonIdIndexSpecsToReturnBySyncSource[0] + << nonIdIndexSpecsToReturnBySyncSource[1]), + "nextBatch")); + } + + collectionCloner->waitForDbWorker(); + + // 'status' will be set if listIndexes fails. + ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); + + ASSERT_EQUALS(collNss.ns(), alternateNss.ns()); + ASSERT_BSONOBJ_EQ(options.toBSON(), collOptions.toBSON()); + + BSONObj expectedIdIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_" + << "ns" + << alternateNss.ns()); + ASSERT_BSONOBJ_EQ(collIdIndexSpec, expectedIdIndexSpec); + + auto expectedNonIdIndexSpecs = makeSecondaryIndexSpecs(alternateNss); + ASSERT_EQUALS(collSecondaryIndexSpecs.size(), expectedNonIdIndexSpecs.size()); + + for (std::vector<BSONObj>::size_type i = 0; i < expectedNonIdIndexSpecs.size(); ++i) { + ASSERT_BSONOBJ_EQ(collSecondaryIndexSpecs[i], expectedNonIdIndexSpecs[i]); + } + + // Cloner is still active because it has to read the documents from the source collection. + ASSERT_TRUE(collectionCloner->isActive()); +} + +TEST_F(CollectionClonerUUIDTest, SingleCloningCursorWithUUIDUsesFindCommand) { + // With a single cloning cursor, expect a find command. + testWithMaxNumCloningCursors(1, "find"); +} + +TEST_F(CollectionClonerUUIDTest, ThreeCloningCursorsWithUUIDUsesParallelCollectionScanCommand) { + // With three cloning cursors, expect a parallelCollectionScan command. + testWithMaxNumCloningCursors(3, "parallelCollectionScan"); +} + class ParallelCollectionClonerTest : public BaseClonerTest { public: BaseCloner* getCloner() const override; @@ -1436,9 +1607,9 @@ void ParallelCollectionClonerTest::setUp() { [this](const NamespaceString& nss, const CollectionOptions& options, const BSONObj idIndexSpec, - const std::vector<BSONObj>& secondaryIndexSpecs) { + const std::vector<BSONObj>& nonIdIndexSpecs) { _loader = new CollectionBulkLoaderMock(&collectionStats); - Status initCollectionBulkLoader = _loader->init(secondaryIndexSpecs); + Status initCollectionBulkLoader = _loader->init(nonIdIndexSpecs); ASSERT_OK(initCollectionBulkLoader); return StatusWith<std::unique_ptr<CollectionBulkLoader>>( @@ -1942,8 +2113,8 @@ TEST_F(ParallelCollectionClonerTest, // At this point, the CollectionCloner has sent the find request to establish the cursor. // We want to return the first batch of documents for the collection from the network so that - // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for - // the next batch of documents. + // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request + // for the next batch of documents. // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet. executor::TaskExecutor::CallbackFn insertDocumentsFn; @@ -1983,8 +2154,8 @@ TEST_F(ParallelCollectionClonerTest, ASSERT_TRUE(collectionCloner->isActive()); ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - // Run the insertDocuments task. The final status of the CollectionCloner should match the first - // error passed to the completion guard (ie. from the failed getMore request). + // Run the insertDocuments task. The final status of the CollectionCloner should match the + // first error passed to the completion guard (ie. from the failed getMore request). executor::TaskExecutor::CallbackArgs callbackArgs( &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, "")); insertDocumentsFn(callbackArgs); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 6c4a84d3067..a88d8dc7589 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1236,9 +1236,10 @@ Status applyCommand_inlock(OperationContext* opCtx, } } - // Applying renameCollection during initial sync might lead to data corruption, so we restart - // the initial sync. - if (!inSteadyStateReplication && o.firstElementFieldName() == std::string("renameCollection")) { + // Applying renameCollection during initial sync to a collection without UUID might lead to + // data corruption, so we restart the initial sync. + if (fieldUI.eoo() && !inSteadyStateReplication && + o.firstElementFieldName() == std::string("renameCollection")) { if (!allowUnsafeRenamesDuringInitialSync.load()) { return Status(ErrorCodes::OplogOperationUnsupported, str::stream() diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index b34d74bec32..5022aa78722 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -1413,20 +1413,6 @@ TEST_F(IdempotencyTest, CollModIndexNotFound) { testOpsAreIdempotent(ops); } -TEST_F(IdempotencyTest, ResyncOnRenameCollection) { - ASSERT_OK( - ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_RECOVERING)); - - auto cmd = BSON("renameCollection" << nss.ns() << "to" - << "test.bar" - << "stayTemp" - << false - << "dropTarget" - << false); - auto op = makeCommandOplogEntry(nextOpTime(), nss, cmd); - ASSERT_EQUALS(runOp(op), ErrorCodes::OplogOperationUnsupported); -} - } // namespace } // namespace repl } // namespace mongo |