diff options
Diffstat (limited to 'src/mongo/db/catalog/database_impl.cpp')
-rw-r--r-- | src/mongo/db/catalog/database_impl.cpp | 73 |
1 files changed, 36 insertions, 37 deletions
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 430b540b034..efb175b079b 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -49,6 +49,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/drop_indexes.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/catalog/uncommitted_collections.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -83,6 +84,7 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeLoggingCreateCollection); MONGO_FAIL_POINT_DEFINE(hangAndFailAfterCreateCollectionReservesOpTime); +MONGO_FAIL_POINT_DEFINE(openCreateCollectionWindowFp); Status validateDBNameForWindows(StringData dbname) { const std::vector<std::string> windowsReservedNames = { @@ -142,7 +144,7 @@ void DatabaseImpl::init(OperationContext* const opCtx) const { auto& catalog = CollectionCatalog::get(opCtx); for (const auto& uuid : catalog.getAllCollectionUUIDsFromDb(_name)) { - auto collection = catalog.lookupCollectionByUUID(uuid); + auto collection = catalog.lookupCollectionByUUID(opCtx, uuid); invariant(collection); // If this is called from the repair path, the collection is already initialized. if (!collection->isInitialized()) @@ -307,7 +309,10 @@ Status DatabaseImpl::dropView(OperationContext* opCtx, NamespaceString viewName) Status DatabaseImpl::dropCollection(OperationContext* opCtx, NamespaceString nss, repl::OpTime dropOpTime) const { - if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss)) { + // Cannot drop uncommitted collections. + invariant(!UncommittedCollections::getForTxn(opCtx, nss)); + + if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss)) { // Collection doesn't exist so don't bother validating if it can be dropped. return Status::OK(); } @@ -344,7 +349,7 @@ Status DatabaseImpl::dropCollectionEvenIfSystem(OperationContext* opCtx, "dropCollection() cannot accept a valid drop optime when writes are replicated."); } - Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss); + Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); if (!collection) { return Status::OK(); // Post condition already met. @@ -484,13 +489,14 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx, invariant(fromNss.db() == _name); invariant(toNss.db() == _name); - if (CollectionCatalog::get(opCtx).lookupCollectionByNamespace(toNss)) { + if (CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, toNss)) { return Status(ErrorCodes::NamespaceExists, str::stream() << "Cannot rename '" << fromNss << "' to '" << toNss << "' because the destination namespace already exists"); } - Collection* collToRename = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(fromNss); + Collection* collToRename = + CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, fromNss); if (!collToRename) { return Status(ErrorCodes::NamespaceNotFound, "collection not found to rename"); } @@ -499,7 +505,7 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx, "collection " << fromNss); - Collection* toColl = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(toNss); + Collection* toColl = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, toNss); if (toColl) { invariant( !toColl->getIndexCatalog()->haveAnyIndexesInProgress(), @@ -517,9 +523,8 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx, opCtx, collToRename->getCatalogId(), toNss, stayTemp); // Set the namespace of 'collToRename' from within the CollectionCatalog. This is necessary - // because - // the CollectionCatalog mutex synchronizes concurrent access to the collection's namespace for - // callers that may not hold a collection lock. + // because the CollectionCatalog mutex synchronizes concurrent access to the collection's + // namespace for callers that may not hold a collection lock. CollectionCatalog::get(opCtx).setCollectionNamespace(opCtx, collToRename, fromNss, toNss); opCtx->recoveryUnit()->onCommit([collToRename](auto commitTime) { @@ -535,9 +540,15 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx, void DatabaseImpl::_checkCanCreateCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) const { - massert(17399, - str::stream() << "Cannot create collection " << nss << " - collection already exists.", - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss) == nullptr); + if (CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss) != nullptr) { + if (options.isView()) { + uasserted(17399, + str::stream() + << "Cannot create collection " << nss << " - collection already exists."); + } else { + throw WriteConflictException(); + } + } uassert(14037, "can't create user databases on a --configsvr instance", @@ -589,7 +600,7 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx, const BSONObj& idIndex) const { invariant(!options.isView()); - invariant(opCtx->lockState()->isDbLockedForMode(name(), MODE_IX)); + invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); uassert(CannotImplicitlyCreateCollectionInfo(nss), "request doesn't allow collection to be created implicitly", @@ -624,8 +635,12 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx, // transaction, we reserve an opTime before the collection creation, then pass it to the // opObserver. Reserving the optime automatically sets the storage timestamp. OplogSlot createOplogSlot; + Timestamp createTime; if (canAcceptWrites && supportsDocLocking() && !coordinator->isOplogDisabledFor(opCtx, nss)) { createOplogSlot = repl::getNextOpTime(opCtx); + createTime = createOplogSlot.getTimestamp(); + } else { + createTime = opCtx->recoveryUnit()->getCommitTimestamp(); } if (MONGO_unlikely(hangAndFailAfterCreateCollectionReservesOpTime.shouldFail())) { @@ -653,17 +668,12 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx, std::move(catalogIdRecordStorePair.second)); auto collection = ownedCollection.get(); ownedCollection->init(opCtx); - - opCtx->recoveryUnit()->onCommit([collection](auto commitTime) { - // Ban reading from this collection on committed reads on snapshots before now. - if (commitTime) - collection->setMinimumVisibleSnapshot(commitTime.get()); - }); - - auto& catalog = CollectionCatalog::get(opCtx); - auto uuid = ownedCollection->uuid(); - catalog.registerCollection(uuid, std::move(ownedCollection)); - opCtx->recoveryUnit()->onRollback([uuid, &catalog] { catalog.deregisterCollection(uuid); }); + UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection), createTime); + openCreateCollectionWindowFp.executeIf([&](const BSONObj& data) { sleepsecs(3); }, + [&](const BSONObj& data) { + const auto collElem = data["collectionNS"]; + return !collElem || nss.toString() == collElem.str(); + }); BSONObj fullIdIndexSpec; @@ -740,7 +750,7 @@ StatusWith<NamespaceString> DatabaseImpl::makeUniqueCollectionNamespace( replacePercentSign); NamespaceString nss(_name, collectionName); - if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss)) { + if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss)) { return nss; } } @@ -772,7 +782,7 @@ void DatabaseImpl::checkForIdIndexesAndDropPendingCollections(OperationContext* if (nss.isSystem()) continue; - Collection* coll = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss); + Collection* coll = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); if (!coll) continue; @@ -793,20 +803,9 @@ Status DatabaseImpl::userCreateNS(OperationContext* opCtx, bool createDefaultIndexes, const BSONObj& idIndex) const { LOG(1) << "create collection " << nss << ' ' << collectionOptions.toBSON(); - if (!NamespaceString::validCollectionComponent(nss.ns())) return Status(ErrorCodes::InvalidNamespace, str::stream() << "invalid ns: " << nss); - Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss); - - if (collection) - return Status(ErrorCodes::NamespaceExists, - str::stream() << "a collection '" << nss << "' already exists"); - - if (ViewCatalog::get(this)->lookup(opCtx, nss.ns())) - return Status(ErrorCodes::NamespaceExists, - str::stream() << "a view '" << nss << "' already exists"); - // Validate the collation, if there is one. std::unique_ptr<CollatorInterface> collator; if (!collectionOptions.collation.isEmpty()) { |