diff options
author | David Storch <david.storch@10gen.com> | 2018-10-04 18:27:48 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2018-10-23 17:18:11 -0400 |
commit | 2cdc2a96e1c8779658fe0eab459dcc38cf01c54d (patch) | |
tree | 875f55acaaba283f02895938ee4b6c764eac349a /src/mongo/db/catalog/rename_collection_test.cpp | |
parent | e7bed9bdcb376d5a06dce6228047309e8481f9cf (diff) | |
download | mongo-2cdc2a96e1c8779658fe0eab459dcc38cf01c54d.tar.gz |
SERVER-37443 Make catalog objects survive collection rename.
This change only applies to collection renames within the
same database. Rename across databases requires copying the
data, and the resulting collection will have a new UUID.
Diffstat (limited to 'src/mongo/db/catalog/rename_collection_test.cpp')
-rw-r--r-- | src/mongo/db/catalog/rename_collection_test.cpp | 142 |
1 files changed, 138 insertions, 4 deletions
diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp index 7ed933b58d5..ddd0bdc85c4 100644 --- a/src/mongo/db/catalog/rename_collection_test.cpp +++ b/src/mongo/db/catalog/rename_collection_test.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/multi_index_block.h" #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/catalog/uuid_catalog.h" @@ -95,7 +96,8 @@ public: repl::OpTime onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName, - OptionalCollectionUUID uuid) override; + OptionalCollectionUUID uuid, + CollectionDropType dropType) override; void onRenameCollection(OperationContext* opCtx, const NamespaceString& fromCollection, @@ -127,6 +129,8 @@ public: OptionalCollectionUUID onRenameCollectionDropTarget; repl::OpTime renameOpTime = {Timestamp(Seconds(100), 1U), 1LL}; + repl::OpTime dropOpTime = {Timestamp(Seconds(100), 1U), 1LL}; + private: /** * Pushes 'operationName' into 'oplogEntries' if we can write to the oplog for this namespace. @@ -176,10 +180,16 @@ void OpObserverMock::onCreateCollection(OperationContext* opCtx, repl::OpTime OpObserverMock::onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName, - OptionalCollectionUUID uuid) { + OptionalCollectionUUID uuid, + const CollectionDropType dropType) { _logOp(opCtx, collectionName, "drop"); - OpObserver::Times::get(opCtx).reservedOpTimes.push_back( - OpObserverNoop::onDropCollection(opCtx, collectionName, uuid)); + // If the oplog is not disabled for this namespace, then we need to reserve an op time for the + // drop. + if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, collectionName)) { + OpObserver::Times::get(opCtx).reservedOpTimes.push_back(dropOpTime); + } + auto noopOptime = OpObserverNoop::onDropCollection(opCtx, collectionName, uuid, dropType); + invariant(noopOptime.isNull()); return {}; } @@ -434,6 +444,19 @@ void _insertDocument(OperationContext* opCtx, const NamespaceString& nss, const }); } +/** + * Retrieves the pointer to a collection associated with the given namespace string from the + * catalog. The caller must hold the appropriate locks from the lock manager. + */ +Collection* _getCollection_inlock(OperationContext* opCtx, const NamespaceString& nss) { + invariant(opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IS)); + auto* db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.db()); + if (!db) { + return nullptr; + } + return db->getCollection(opCtx, nss.ns()); +} + TEST_F(RenameCollectionTest, RenameCollectionReturnsNamespaceNotFoundIfDatabaseDoesNotExist) { ASSERT_FALSE(AutoGetDb(_opCtx.get(), _sourceNss.db(), MODE_X).getDb()); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, @@ -1098,4 +1121,115 @@ TEST_F(RenameCollectionTest, ASSERT_TRUE(_opObserver->onInsertsIsGlobalWriteLockExclusive); } +TEST_F(RenameCollectionTest, CollectionPointerRemainsValidThroughRename) { + _createCollection(_opCtx.get(), _sourceNss); + Lock::GlobalWrite globalWrite(_opCtx.get()); + + // Get a pointer to the source collection, and ensure that it reports the expected namespace + // string. + Collection* sourceColl = _getCollection_inlock(_opCtx.get(), _sourceNss); + ASSERT(sourceColl); + + ASSERT_OK(renameCollection(_opCtx.get(), _sourceNss, _targetNss, {})); + + // Retrieve the pointer associated with the target namespace, and ensure that its the same + // pointer (i.e. the renamed collection has the very same Collection instance). + Collection* targetColl = _getCollection_inlock(_opCtx.get(), _targetNss); + ASSERT(targetColl); + ASSERT_EQ(targetColl, sourceColl); + + // Verify that the Collection reports that its namespace is now the target namespace. + ASSERT_EQ(targetColl->ns(), _targetNss); +} + +TEST_F(RenameCollectionTest, CollectionCatalogEntryPointerRemainsValidThroughRename) { + _createCollection(_opCtx.get(), _sourceNss); + Lock::GlobalWrite globalWrite(_opCtx.get()); + + // Get a pointer to the source collection, and ensure that it reports the expected namespace + // string. + Collection* sourceColl = _getCollection_inlock(_opCtx.get(), _sourceNss); + ASSERT(sourceColl); + auto* sourceCatalogEntry = sourceColl->getCatalogEntry(); + ASSERT(sourceCatalogEntry); + ASSERT_EQ(sourceCatalogEntry->ns(), _sourceNss); + + ASSERT_OK(renameCollection(_opCtx.get(), _sourceNss, _targetNss, {})); + + // Verify that the CollectionCatalogEntry reports that its namespace is now the target + // namespace. + ASSERT_EQ(sourceCatalogEntry->ns(), _targetNss); +} + +TEST_F(RenameCollectionTest, CatalogPointersRenameValidThroughRenameAfterDroppingTarget) { + _createCollection(_opCtx.get(), _sourceNss); + _createCollection(_opCtx.get(), _targetNss); + Lock::GlobalWrite globalWrite(_opCtx.get()); + + Collection* sourceColl = _getCollection_inlock(_opCtx.get(), _sourceNss); + ASSERT(sourceColl); + auto* sourceCatalogEntry = sourceColl->getCatalogEntry(); + ASSERT(sourceCatalogEntry); + + RenameCollectionOptions options; + options.dropTarget = true; + ASSERT_OK(renameCollection(_opCtx.get(), _sourceNss, _targetNss, options)); + + // The same catalog pointers should now report that they are associated with the target + // namespace. + ASSERT_EQ(sourceColl->ns(), _targetNss); + ASSERT_EQ(sourceCatalogEntry->ns(), _targetNss); +} + +TEST_F(RenameCollectionTest, CatalogPointersRenameValidThroughRenameForApplyOps) { + _createCollection(_opCtx.get(), _sourceNss); + Collection* sourceColl = AutoGetCollectionForRead(_opCtx.get(), _sourceNss).getCollection(); + ASSERT(sourceColl); + + auto uuidDoc = BSON("ui" << UUID::gen()); + auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _targetNss.ns()); + ASSERT_OK(renameCollectionForApplyOps( + _opCtx.get(), _sourceNss.db().toString(), uuidDoc["ui"], cmd, {})); + ASSERT_FALSE(_collectionExists(_opCtx.get(), _sourceNss)); + + Collection* targetColl = AutoGetCollectionForRead(_opCtx.get(), _targetNss).getCollection(); + ASSERT(targetColl); + ASSERT_EQ(targetColl, sourceColl); + ASSERT_EQ(targetColl->ns(), _targetNss); +} + +TEST_F(RenameCollectionTest, RenameAcrossDatabasesDoesNotPreserveCatalogPointers) { + _createCollection(_opCtx.get(), _sourceNss); + Lock::GlobalWrite globalWrite(_opCtx.get()); + + // Get a pointer to the source collection, and ensure that it reports the expected namespace + // string. + Collection* sourceColl = _getCollection_inlock(_opCtx.get(), _sourceNss); + ASSERT(sourceColl); + auto* sourceCatalogEntry = sourceColl->getCatalogEntry(); + ASSERT(sourceCatalogEntry); + + ASSERT_OK(renameCollection(_opCtx.get(), _sourceNss, _targetNssDifferentDb, {})); + + // Verify that the CollectionCatalogEntry reports that its namespace is now the target + // namespace. + Collection* targetColl = _getCollection_inlock(_opCtx.get(), _targetNssDifferentDb); + ASSERT(targetColl); + ASSERT_NE(targetColl, sourceColl); + auto* targetCatalogEntry = targetColl->getCatalogEntry(); + ASSERT(targetCatalogEntry); + ASSERT_NE(targetCatalogEntry, sourceCatalogEntry); +} + +TEST_F(RenameCollectionTest, UUIDCatalogMappingRemainsIntactThroughRename) { + _createCollection(_opCtx.get(), _sourceNss); + Lock::GlobalWrite globalWrite(_opCtx.get()); + auto& uuidCatalog = UUIDCatalog::get(_opCtx.get()); + Collection* sourceColl = _getCollection_inlock(_opCtx.get(), _sourceNss); + ASSERT(sourceColl); + ASSERT_EQ(sourceColl, uuidCatalog.lookupCollectionByUUID(*sourceColl->uuid())); + ASSERT_OK(renameCollection(_opCtx.get(), _sourceNss, _targetNss, {})); + ASSERT_EQ(sourceColl, uuidCatalog.lookupCollectionByUUID(*sourceColl->uuid())); +} + } // namespace |