summaryrefslogtreecommitdiff
path: root/src/mongo/db/catalog/database_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/catalog/database_impl.cpp')
-rw-r--r--src/mongo/db/catalog/database_impl.cpp73
1 files changed, 36 insertions, 37 deletions
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index 430b540b034..efb175b079b 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/drop_indexes.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/catalog/uncommitted_collections.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands/feature_compatibility_version_parser.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -83,6 +84,7 @@ namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeLoggingCreateCollection);
MONGO_FAIL_POINT_DEFINE(hangAndFailAfterCreateCollectionReservesOpTime);
+MONGO_FAIL_POINT_DEFINE(openCreateCollectionWindowFp);
Status validateDBNameForWindows(StringData dbname) {
const std::vector<std::string> windowsReservedNames = {
@@ -142,7 +144,7 @@ void DatabaseImpl::init(OperationContext* const opCtx) const {
auto& catalog = CollectionCatalog::get(opCtx);
for (const auto& uuid : catalog.getAllCollectionUUIDsFromDb(_name)) {
- auto collection = catalog.lookupCollectionByUUID(uuid);
+ auto collection = catalog.lookupCollectionByUUID(opCtx, uuid);
invariant(collection);
// If this is called from the repair path, the collection is already initialized.
if (!collection->isInitialized())
@@ -307,7 +309,10 @@ Status DatabaseImpl::dropView(OperationContext* opCtx, NamespaceString viewName)
Status DatabaseImpl::dropCollection(OperationContext* opCtx,
NamespaceString nss,
repl::OpTime dropOpTime) const {
- if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss)) {
+ // Cannot drop uncommitted collections.
+ invariant(!UncommittedCollections::getForTxn(opCtx, nss));
+
+ if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss)) {
// Collection doesn't exist so don't bother validating if it can be dropped.
return Status::OK();
}
@@ -344,7 +349,7 @@ Status DatabaseImpl::dropCollectionEvenIfSystem(OperationContext* opCtx,
"dropCollection() cannot accept a valid drop optime when writes are replicated.");
}
- Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss);
+ Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
if (!collection) {
return Status::OK(); // Post condition already met.
@@ -484,13 +489,14 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx,
invariant(fromNss.db() == _name);
invariant(toNss.db() == _name);
- if (CollectionCatalog::get(opCtx).lookupCollectionByNamespace(toNss)) {
+ if (CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, toNss)) {
return Status(ErrorCodes::NamespaceExists,
str::stream() << "Cannot rename '" << fromNss << "' to '" << toNss
<< "' because the destination namespace already exists");
}
- Collection* collToRename = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(fromNss);
+ Collection* collToRename =
+ CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, fromNss);
if (!collToRename) {
return Status(ErrorCodes::NamespaceNotFound, "collection not found to rename");
}
@@ -499,7 +505,7 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx,
"collection "
<< fromNss);
- Collection* toColl = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(toNss);
+ Collection* toColl = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, toNss);
if (toColl) {
invariant(
!toColl->getIndexCatalog()->haveAnyIndexesInProgress(),
@@ -517,9 +523,8 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx,
opCtx, collToRename->getCatalogId(), toNss, stayTemp);
// Set the namespace of 'collToRename' from within the CollectionCatalog. This is necessary
- // because
- // the CollectionCatalog mutex synchronizes concurrent access to the collection's namespace for
- // callers that may not hold a collection lock.
+ // because the CollectionCatalog mutex synchronizes concurrent access to the collection's
+ // namespace for callers that may not hold a collection lock.
CollectionCatalog::get(opCtx).setCollectionNamespace(opCtx, collToRename, fromNss, toNss);
opCtx->recoveryUnit()->onCommit([collToRename](auto commitTime) {
@@ -535,9 +540,15 @@ Status DatabaseImpl::renameCollection(OperationContext* opCtx,
void DatabaseImpl::_checkCanCreateCollection(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionOptions& options) const {
- massert(17399,
- str::stream() << "Cannot create collection " << nss << " - collection already exists.",
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss) == nullptr);
+ if (CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss) != nullptr) {
+ if (options.isView()) {
+ uasserted(17399,
+ str::stream()
+ << "Cannot create collection " << nss << " - collection already exists.");
+ } else {
+ throw WriteConflictException();
+ }
+ }
uassert(14037,
"can't create user databases on a --configsvr instance",
@@ -589,7 +600,7 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx,
const BSONObj& idIndex) const {
invariant(!options.isView());
- invariant(opCtx->lockState()->isDbLockedForMode(name(), MODE_IX));
+ invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
uassert(CannotImplicitlyCreateCollectionInfo(nss),
"request doesn't allow collection to be created implicitly",
@@ -624,8 +635,12 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx,
// transaction, we reserve an opTime before the collection creation, then pass it to the
// opObserver. Reserving the optime automatically sets the storage timestamp.
OplogSlot createOplogSlot;
+ Timestamp createTime;
if (canAcceptWrites && supportsDocLocking() && !coordinator->isOplogDisabledFor(opCtx, nss)) {
createOplogSlot = repl::getNextOpTime(opCtx);
+ createTime = createOplogSlot.getTimestamp();
+ } else {
+ createTime = opCtx->recoveryUnit()->getCommitTimestamp();
}
if (MONGO_unlikely(hangAndFailAfterCreateCollectionReservesOpTime.shouldFail())) {
@@ -653,17 +668,12 @@ Collection* DatabaseImpl::createCollection(OperationContext* opCtx,
std::move(catalogIdRecordStorePair.second));
auto collection = ownedCollection.get();
ownedCollection->init(opCtx);
-
- opCtx->recoveryUnit()->onCommit([collection](auto commitTime) {
- // Ban reading from this collection on committed reads on snapshots before now.
- if (commitTime)
- collection->setMinimumVisibleSnapshot(commitTime.get());
- });
-
- auto& catalog = CollectionCatalog::get(opCtx);
- auto uuid = ownedCollection->uuid();
- catalog.registerCollection(uuid, std::move(ownedCollection));
- opCtx->recoveryUnit()->onRollback([uuid, &catalog] { catalog.deregisterCollection(uuid); });
+ UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection), createTime);
+ openCreateCollectionWindowFp.executeIf([&](const BSONObj& data) { sleepsecs(3); },
+ [&](const BSONObj& data) {
+ const auto collElem = data["collectionNS"];
+ return !collElem || nss.toString() == collElem.str();
+ });
BSONObj fullIdIndexSpec;
@@ -740,7 +750,7 @@ StatusWith<NamespaceString> DatabaseImpl::makeUniqueCollectionNamespace(
replacePercentSign);
NamespaceString nss(_name, collectionName);
- if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss)) {
+ if (!CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss)) {
return nss;
}
}
@@ -772,7 +782,7 @@ void DatabaseImpl::checkForIdIndexesAndDropPendingCollections(OperationContext*
if (nss.isSystem())
continue;
- Collection* coll = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss);
+ Collection* coll = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
if (!coll)
continue;
@@ -793,20 +803,9 @@ Status DatabaseImpl::userCreateNS(OperationContext* opCtx,
bool createDefaultIndexes,
const BSONObj& idIndex) const {
LOG(1) << "create collection " << nss << ' ' << collectionOptions.toBSON();
-
if (!NamespaceString::validCollectionComponent(nss.ns()))
return Status(ErrorCodes::InvalidNamespace, str::stream() << "invalid ns: " << nss);
- Collection* collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(nss);
-
- if (collection)
- return Status(ErrorCodes::NamespaceExists,
- str::stream() << "a collection '" << nss << "' already exists");
-
- if (ViewCatalog::get(this)->lookup(opCtx, nss.ns()))
- return Status(ErrorCodes::NamespaceExists,
- str::stream() << "a view '" << nss << "' already exists");
-
// Validate the collation, if there is one.
std::unique_ptr<CollatorInterface> collator;
if (!collectionOptions.collation.isEmpty()) {