From 42bfda31eae322ac190e0c8cd831ca73f6e78f18 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Wed, 30 Aug 2017 22:52:03 -0400 Subject: SERVER-30371 renaming collection across databases logs individual oplog entries --- jstests/noPassthrough/atomic_rename_collection.js | 22 +- src/mongo/db/catalog/rename_collection.cpp | 134 +++++------- src/mongo/db/catalog/rename_collection_test.cpp | 246 ++++++++++++++++++++++ 3 files changed, 321 insertions(+), 81 deletions(-) diff --git a/jstests/noPassthrough/atomic_rename_collection.js b/jstests/noPassthrough/atomic_rename_collection.js index 8a1b5526a91..d63e08ff6ce 100644 --- a/jstests/noPassthrough/atomic_rename_collection.js +++ b/jstests/noPassthrough/atomic_rename_collection.js @@ -12,7 +12,19 @@ let local = prim.getDB("local"); // Test both for rename within a database as across databases. - [{source: first.x, target: first.y}, {source: first.x, target: second.x}].forEach((test) => { + const tests = [ + { + source: first.x, + target: first.y, + expectedOplogEntries: 1, + }, + { + source: first.x, + target: second.x, + expectedOplogEntries: 4, + } + ]; + tests.forEach((test) => { test.source.drop(); assert.writeOK(test.source.insert({})); assert.writeOK(test.target.insert({})); @@ -25,9 +37,9 @@ }; assert.commandWorked(local.adminCommand(cmd), tojson(cmd)); ops = local.oplog.rs.find({ts: {$gt: ts}}).sort({$natural: 1}).toArray(); - assert.eq( - ops.length, - 1, - "renameCollection was supposed to only generate a single oplog entry: " + tojson(ops)); + assert.eq(ops.length, + test.expectedOplogEntries, + "renameCollection was supposed to only generate " + test.expectedOplogEntries + + " oplog entries: " + tojson(ops)); }); })(); diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 30a1ac28d9c..f751accff49 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -57,13 +57,6 @@ namespace mongo { namespace { -void dropCollection(OperationContext* opCtx, Database* db, StringData collName) { - WriteUnitOfWork wunit(opCtx); - if (db->dropCollection(opCtx, collName).isOK()) { - // ignoring failure case - wunit.commit(); - } -} NamespaceString getNamespaceFromUUID(OperationContext* opCtx, const BSONElement& ui) { if (ui.eoo()) @@ -98,7 +91,8 @@ Status renameCollectionCommon(OperationContext* opCtx, globalWriteLock.emplace(opCtx); // We stay in source context the whole time. This is mostly to set the CurOp namespace. - OldClientContext ctx(opCtx, source.ns()); + boost::optional ctx; + ctx.emplace(opCtx, source.ns()); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, source); @@ -253,14 +247,11 @@ Status renameCollectionCommon(OperationContext* opCtx, Collection* tmpColl = nullptr; OptionalCollectionUUID newUUID; - bool isSourceCollectionTemporary = false; { auto collectionOptions = sourceColl->getCatalogEntry()->getCollectionOptions(opCtx); - isSourceCollectionTemporary = collectionOptions.temp; // Renaming across databases will result in a new UUID, as otherwise we'd require // two collections with the same uuid (temporarily). - collectionOptions.temp = true; if (targetUUID) newUUID = targetUUID; else if (collectionOptions.uuid && enableCollectionUUIDs) @@ -270,32 +261,43 @@ Status renameCollectionCommon(OperationContext* opCtx, writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { WriteUnitOfWork wunit(opCtx); - - // No logOp necessary because the entire renameCollection command is one logOp. - repl::UnreplicatedWritesBlock uwb(opCtx); - tmpColl = targetDB->createCollection(opCtx, - tmpName.ns(), - collectionOptions, - false); // _id index build with others later. - + tmpColl = targetDB->createCollection(opCtx, tmpName.ns(), collectionOptions); wunit.commit(); }); } // Dismissed on success - ScopeGuard tmpCollectionDropper = MakeGuard(dropCollection, opCtx, targetDB, tmpName.ns()); - - MultiIndexBlock indexer(opCtx, tmpColl); - indexer.allowInterruption(); - std::vector indexers{&indexer}; + auto tmpCollectionDropper = MakeGuard([&] { + BSONObjBuilder unusedResult; + auto status = + dropCollection(opCtx, + tmpName, + unusedResult, + renameOpTimeFromApplyOps, + DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); + if (!status.isOK()) { + // Ignoring failure case when dropping the temporary collection during cleanup because + // the rename operation has already failed for another reason. + log() << "Unable to drop temporary collection " << tmpName << " while renaming from " + << source << " to " << target << ": " << status; + } + }); // Copy the index descriptions from the source collection, adjusting the ns field. { + MultiIndexBlock indexer(opCtx, tmpColl); + indexer.allowInterruption(); + std::vector indexesToCopy; IndexCatalog::IndexIterator sourceIndIt = sourceColl->getIndexCatalog()->getIndexIterator(opCtx, true); while (sourceIndIt.more()) { - const BSONObj currIndex = sourceIndIt.next()->infoObj(); + auto descriptor = sourceIndIt.next(); + if (descriptor->isIdIndex()) { + continue; + } + + const BSONObj currIndex = descriptor->infoObj(); // Process the source index, adding fields in the same order as they were originally. BSONObjBuilder newIndex; @@ -308,14 +310,30 @@ Status renameCollectionCommon(OperationContext* opCtx, } indexesToCopy.push_back(newIndex.obj()); } + status = indexer.init(indexesToCopy).getStatus(); if (!status.isOK()) { return status; } + + status = indexer.doneInserting(); + if (!status.isOK()) { + return status; + } + + writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { + WriteUnitOfWork wunit(opCtx); + indexer.commit(); + for (auto&& infoObj : indexesToCopy) { + getGlobalServiceContext()->getOpObserver()->onCreateIndex( + opCtx, tmpName, newUUID, infoObj, false); + } + wunit.commit(); + }); } { - // Copy over all the data from source collection to target collection. + // Copy over all the data from source collection to temporary collection. auto cursor = sourceColl->getCursor(opCtx); while (auto record = cursor->next()) { opCtx->checkForInterrupt(); @@ -324,9 +342,9 @@ Status renameCollectionCommon(OperationContext* opCtx, status = writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { WriteUnitOfWork wunit(opCtx); - // No logOp necessary because the entire renameCollection command is one logOp. - repl::UnreplicatedWritesBlock uwb(opCtx); - Status status = tmpColl->insertDocument(opCtx, obj, indexers, true); + const InsertStatement stmt(obj); + OpDebug* const opDebug = nullptr; + auto status = tmpColl->insertDocument(opCtx, stmt, opDebug, true); if (!status.isOK()) return status; wunit.commit(); @@ -339,60 +357,24 @@ Status renameCollectionCommon(OperationContext* opCtx, } } - status = indexer.doneInserting(); - if (!status.isOK()) { - return status; - } - // Getting here means we successfully built the target copy. We now do the final // in-place rename and remove the source collection. - status = writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { - WriteUnitOfWork wunit(opCtx); - indexer.commit(); - OptionalCollectionUUID dropTargetUUID; - { - repl::UnreplicatedWritesBlock uwb(opCtx); - Status status = Status::OK(); - if (targetColl) { - dropTargetUUID = targetColl->uuid(); - status = targetDB->dropCollection(opCtx, target.ns()); - } - if (status.isOK()) { - // When renaming the temporary collection in the target database, we have to take - // into account the CollectionOptions.temp value of the source collection and the - // 'stayTemp' option requested by the caller. - // If the source collection is not temporary, the resulting target collection must - // not be temporary. - auto stayTemp = isSourceCollectionTemporary && options.stayTemp; - status = targetDB->renameCollection(opCtx, tmpName.ns(), target.ns(), stayTemp); - } - if (status.isOK()) - status = sourceDB->dropCollection(opCtx, source.ns()); - - if (!status.isOK()) - return status; - } - - getGlobalServiceContext()->getOpObserver()->onRenameCollection(opCtx, - source, - target, - newUUID, - options.dropTarget, - dropTargetUUID, - sourceUUID, - options.stayTemp); - - wunit.commit(); - return Status::OK(); - }); - + invariant(tmpName.db() == target.db()); + status = renameCollectionCommon( + opCtx, tmpName, target, targetUUID, renameOpTimeFromApplyOps, options); if (!status.isOK()) { return status; } - tmpCollectionDropper.Dismiss(); - return Status::OK(); + + BSONObjBuilder unusedResult; + return dropCollection(opCtx, + source, + unusedResult, + renameOpTimeFromApplyOps, + DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); } + } // namespace Status renameCollection(OperationContext* opCtx, diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp index fa32ebaac77..9e8f5ba2ac6 100644 --- a/src/mongo/db/catalog/rename_collection_test.cpp +++ b/src/mongo/db/catalog/rename_collection_test.cpp @@ -29,6 +29,8 @@ #include "mongo/platform/basic.h" #include +#include +#include #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/collection_options.h" @@ -54,6 +56,7 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/stringutils.h" namespace { @@ -66,6 +69,29 @@ using namespace mongo; */ class OpObserverMock : public OpObserverNoop { public: + void onCreateIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + BSONObj indexDoc, + bool fromMigrate) override; + + void onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector::const_iterator begin, + std::vector::const_iterator end, + bool fromMigrate) override; + + void onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex) override; + + repl::OpTime onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) override; + repl::OpTime onRenameCollection(OperationContext* opCtx, const NamespaceString& fromCollection, const NamespaceString& toCollection, @@ -75,10 +101,63 @@ public: OptionalCollectionUUID dropSourceUUID, bool stayTemp) override; + // Operations written to the oplog. These are operations for which + // ReplicationCoordinator::isOplogDisabled() returns false. + std::vector oplogEntries; + + bool onInsertsThrows = false; + bool onRenameCollectionCalled = false; repl::OpTime renameOpTime = {Timestamp(Seconds(100), 1U), 1LL}; + +private: + /** + * Pushes 'operationName' into 'oplogEntries' if we can write to the oplog for this namespace. + */ + void _logOp(OperationContext* opCtx, + const NamespaceString& nss, + const std::string& operationName); }; +void OpObserverMock::onCreateIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + BSONObj indexDoc, + bool fromMigrate) { + _logOp(opCtx, nss, "index"); + OpObserverNoop::onCreateIndex(opCtx, nss, uuid, indexDoc, fromMigrate); +} + +void OpObserverMock::onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector::const_iterator begin, + std::vector::const_iterator end, + bool fromMigrate) { + if (onInsertsThrows) { + uasserted(ErrorCodes::OperationFailed, "insert failed"); + } + + _logOp(opCtx, nss, "inserts"); + OpObserverNoop::onInserts(opCtx, nss, uuid, begin, end, fromMigrate); +} + +void OpObserverMock::onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex) { + _logOp(opCtx, collectionName, "create"); + OpObserverNoop::onCreateCollection(opCtx, coll, collectionName, options, idIndex); +} + +repl::OpTime OpObserverMock::onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) { + _logOp(opCtx, collectionName, "drop"); + return OpObserverNoop::onDropCollection(opCtx, collectionName, uuid); +} + repl::OpTime OpObserverMock::onRenameCollection(OperationContext* opCtx, const NamespaceString& fromCollection, const NamespaceString& toCollection, @@ -87,10 +166,28 @@ repl::OpTime OpObserverMock::onRenameCollection(OperationContext* opCtx, OptionalCollectionUUID dropTargetUUID, OptionalCollectionUUID dropSourceUUID, bool stayTemp) { + _logOp(opCtx, fromCollection, "rename"); + OpObserverNoop::onRenameCollection(opCtx, + fromCollection, + toCollection, + uuid, + dropTarget, + dropTargetUUID, + dropSourceUUID, + stayTemp); onRenameCollectionCalled = true; return renameOpTime; } +void OpObserverMock::_logOp(OperationContext* opCtx, + const NamespaceString& nss, + const std::string& operationName) { + if (repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, nss)) { + return; + } + oplogEntries.push_back(operationName); +} + class RenameCollectionTest : public ServiceContextMongoDTest { public: static ServiceContext::UniqueOperationContext makeOpCtx(); @@ -258,6 +355,24 @@ void _createIndex(OperationContext* opCtx, ASSERT_TRUE(AutoGetCollectionForRead(opCtx, nss).getCollection()); } +/** + * Inserts a single document into a collection. + */ +void _insertDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { + writeConflictRetry(opCtx, "_insertDocument", nss.ns(), [=] { + AutoGetCollection autoColl(opCtx, nss, MODE_X); + auto collection = autoColl.getCollection(); + ASSERT_TRUE(collection) << "Cannot insert document " << doc << " into collection " << nss + << " because collection " << nss.ns() << " does not exist."; + + WriteUnitOfWork wuow(opCtx); + OpDebug* const opDebug = nullptr; + bool enforceQuota = true; + ASSERT_OK(collection->insertDocument(opCtx, InsertStatement(doc), opDebug, enforceQuota)); + wuow.commit(); + }); +} + TEST_F(RenameCollectionTest, RenameCollectionReturnsNamespaceNotFoundIfDatabaseDoesNotExist) { ASSERT_FALSE(AutoGetDb(_opCtx.get(), _sourceNss.db(), MODE_X).getDb()); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, @@ -522,4 +637,135 @@ TEST_F(RenameCollectionTest, RenameDifferentDatabaseStayTempTrueSourceNotTempora _testRenameCollectionStayTemp(_opCtx.get(), _sourceNss, _targetNssDifferentDb, true, false); } +/** + * Checks oplog entries written by the OpObserver to the oplog. + */ +void _checkOplogEntries(const std::vector& actualOplogEntries, + const std::vector& expectedOplogEntries) { + std::string actualOplogEntriesStr; + joinStringDelim(actualOplogEntries, &actualOplogEntriesStr, ','); + std::string expectedOplogEntriesStr; + joinStringDelim(expectedOplogEntries, &expectedOplogEntriesStr, ','); + ASSERT_EQUALS(expectedOplogEntries.size(), actualOplogEntries.size()) + << str::stream() + << "Incorrect number of oplog entries written to oplog. Actual: " << actualOplogEntriesStr + << ". Expected: " << expectedOplogEntriesStr; + std::vector::size_type i = 0; + for (const auto& actualOplogEntry : actualOplogEntries) { + const auto& expectedOplogEntry = expectedOplogEntries[i++]; + ASSERT_EQUALS(expectedOplogEntry, actualOplogEntry) + << str::stream() << "Mismatch in oplog entry at index " << i + << ". Actual: " << actualOplogEntriesStr << ". Expected: " << expectedOplogEntriesStr; + } +} + +/** + * Runs a rename across database operation and checks oplog entries writtent to the oplog. + */ +void _testRenameCollectionAcrossDatabaseOplogEntries( + OperationContext* opCtx, + const NamespaceString& sourceNss, + const NamespaceString& targetNss, + std::vector* oplogEntries, + bool forApplyOps, + const std::vector& expectedOplogEntries) { + ASSERT_NOT_EQUALS(sourceNss.db(), targetNss.db()); + _createCollection(opCtx, sourceNss); + _createIndex(opCtx, sourceNss, "a_1"); + _insertDocument(opCtx, sourceNss, BSON("_id" << 0)); + oplogEntries->clear(); + if (forApplyOps) { + auto cmd = BSON( + "renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "dropTarget" << true); + ASSERT_OK(renameCollectionForApplyOps(opCtx, sourceNss.db().toString(), {}, cmd, {})); + } else { + RenameCollectionOptions options; + options.dropTarget = true; + ASSERT_OK(renameCollection(opCtx, sourceNss, targetNss, options)); + } + _checkOplogEntries(*oplogEntries, expectedOplogEntries); +} + +TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseOplogEntries) { + bool forApplyOps = false; + _testRenameCollectionAcrossDatabaseOplogEntries( + _opCtx.get(), + _sourceNss, + _targetNssDifferentDb, + &_opObserver->oplogEntries, + forApplyOps, + {"create", "index", "inserts", "rename", "drop"}); +} + +TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsAcrossDatabaseOplogEntries) { + bool forApplyOps = true; + _testRenameCollectionAcrossDatabaseOplogEntries( + _opCtx.get(), + _sourceNss, + _targetNssDifferentDb, + &_opObserver->oplogEntries, + forApplyOps, + {"create", "index", "inserts", "rename", "drop"}); +} + +TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseOplogEntriesDropTarget) { + _createCollection(_opCtx.get(), _targetNssDifferentDb); + bool forApplyOps = false; + _testRenameCollectionAcrossDatabaseOplogEntries( + _opCtx.get(), + _sourceNss, + _targetNssDifferentDb, + &_opObserver->oplogEntries, + forApplyOps, + {"create", "index", "inserts", "rename", "drop"}); +} + +TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsAcrossDatabaseOplogEntriesDropTarget) { + _createCollection(_opCtx.get(), _targetNssDifferentDb); + bool forApplyOps = true; + _testRenameCollectionAcrossDatabaseOplogEntries( + _opCtx.get(), + _sourceNss, + _targetNssDifferentDb, + &_opObserver->oplogEntries, + forApplyOps, + {"create", "index", "inserts", "rename", "drop"}); +} + +TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseOplogEntriesWritesNotReplicated) { + repl::UnreplicatedWritesBlock uwb(_opCtx.get()); + bool forApplyOps = false; + _testRenameCollectionAcrossDatabaseOplogEntries(_opCtx.get(), + _sourceNss, + _targetNssDifferentDb, + &_opObserver->oplogEntries, + forApplyOps, + {}); +} + +TEST_F(RenameCollectionTest, + RenameCollectionForApplyOpsAcrossDatabaseOplogEntriesWritesNotReplicated) { + repl::UnreplicatedWritesBlock uwb(_opCtx.get()); + bool forApplyOps = true; + _testRenameCollectionAcrossDatabaseOplogEntries(_opCtx.get(), + _sourceNss, + _targetNssDifferentDb, + &_opObserver->oplogEntries, + forApplyOps, + {}); +} + +TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseDropsTemporaryCollectionOnException) { + _createCollection(_opCtx.get(), _sourceNss); + _createIndex(_opCtx.get(), _sourceNss, "a_1"); + _insertDocument(_opCtx.get(), _sourceNss, BSON("_id" << 0)); + _opObserver->onInsertsThrows = true; + _opObserver->oplogEntries.clear(); + ASSERT_THROWS_CODE( + renameCollection(_opCtx.get(), _sourceNss, _targetNssDifferentDb, {}).ignore(), + AssertionException, + ErrorCodes::OperationFailed); + _checkOplogEntries(_opObserver->oplogEntries, {"create", "index", "drop"}); +} + } // namespace -- cgit v1.2.1