summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp55
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h3
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp4
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp39
4 files changed, 52 insertions, 49 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 23fce736413..13dd6dae785 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -59,9 +59,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient&
const BSONObj& idIndexSpec)
: _client{std::move(client)},
_opCtx{std::move(opCtx)},
- _autoColl{std::move(autoColl)},
- _collection{_autoColl->getWritableCollection()},
- _nss{_autoColl->getCollection()->ns()},
+ _collection{std::move(autoColl)},
+ _nss{_collection->getCollection()->ns()},
_idIndexBlock(std::make_unique<MultiIndexBlock>()),
_secondaryIndexesBlock(std::make_unique<MultiIndexBlock>()),
_idIndexSpec(idIndexSpec.getOwned()) {
@@ -75,20 +74,20 @@ CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() {
}
Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndexSpecs) {
- return _runTaskReleaseResourcesOnFailure([coll = _autoColl->getCollection(),
- &secondaryIndexSpecs,
- this]() -> Status {
+ return _runTaskReleaseResourcesOnFailure([&secondaryIndexSpecs, this]() -> Status {
+ WriteUnitOfWork wuow(_opCtx.get());
// All writes in CollectionBulkLoaderImpl should be unreplicated.
// The opCtx is accessed indirectly through _secondaryIndexesBlock.
UnreplicatedWritesBlock uwb(_opCtx.get());
// This enforces the buildIndexes setting in the replica set configuration.
- auto indexCatalog = coll->getIndexCatalog();
+ CollectionWriter collWriter(*_collection);
+ auto indexCatalog = collWriter.getWritableCollection()->getIndexCatalog();
auto specs = indexCatalog->removeExistingIndexesNoChecks(_opCtx.get(), secondaryIndexSpecs);
if (specs.size()) {
_secondaryIndexesBlock->ignoreUniqueConstraint();
auto status =
_secondaryIndexesBlock
- ->init(_opCtx.get(), _collection, specs, MultiIndexBlock::kNoopOnInitFn)
+ ->init(_opCtx.get(), collWriter, specs, MultiIndexBlock::kNoopOnInitFn)
.getStatus();
if (!status.isOK()) {
return status;
@@ -99,7 +98,7 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
if (!_idIndexSpec.isEmpty()) {
auto status =
_idIndexBlock
- ->init(_opCtx.get(), _collection, _idIndexSpec, MultiIndexBlock::kNoopOnInitFn)
+ ->init(_opCtx.get(), collWriter, _idIndexSpec, MultiIndexBlock::kNoopOnInitFn)
.getStatus();
if (!status.isOK()) {
return status;
@@ -108,6 +107,7 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
_idIndexBlock.reset();
}
+ wuow.commit();
return Status::OK();
});
}
@@ -134,8 +134,9 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
const auto& doc = *insertIter++;
bytesInBlock += doc.objsize();
// This version of insert will not update any indexes.
- const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
- _opCtx.get(), doc, onRecordInserted);
+ const auto status =
+ (*_collection)
+ ->insertDocumentForBulkLoader(_opCtx.get(), doc, onRecordInserted);
if (!status.isOK()) {
return status;
}
@@ -181,8 +182,8 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
WriteUnitOfWork wunit(_opCtx.get());
// For capped collections, we use regular insertDocument, which
// will update pre-existing indexes.
- const auto status = _autoColl->getCollection()->insertDocument(
- _opCtx.get(), InsertStatement(doc), nullptr);
+ const auto status =
+ (*_collection)->insertDocument(_opCtx.get(), InsertStatement(doc), nullptr);
if (!status.isOK()) {
return status;
}
@@ -235,7 +236,7 @@ Status CollectionBulkLoaderImpl::commit() {
WriteUnitOfWork wunit(_opCtx.get());
auto status =
_secondaryIndexesBlock->commit(_opCtx.get(),
- _collection,
+ _collection->getWritableCollection(),
MultiIndexBlock::kNoopOnCreateEachFn,
MultiIndexBlock::kNoopOnCommitFn);
if (!status.isOK()) {
@@ -262,12 +263,13 @@ Status CollectionBulkLoaderImpl::commit() {
// before committing the index build, the index removal code uses
// 'dupsAllowed', which forces the storage engine to only unindex
// records that match the same key and RecordId.
- _autoColl->getCollection()->deleteDocument(_opCtx.get(),
- kUninitializedStmtId,
- rid,
- nullptr /** OpDebug **/,
- false /* fromMigrate */,
- true /* noWarn */);
+ (*_collection)
+ ->deleteDocument(_opCtx.get(),
+ kUninitializedStmtId,
+ rid,
+ nullptr /** OpDebug **/,
+ false /* fromMigrate */,
+ true /* noWarn */);
wunit.commit();
return Status::OK();
});
@@ -296,7 +298,7 @@ Status CollectionBulkLoaderImpl::commit() {
_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] {
WriteUnitOfWork wunit(_opCtx.get());
auto status = _idIndexBlock->commit(_opCtx.get(),
- _collection,
+ _collection->getWritableCollection(),
MultiIndexBlock::kNoopOnCreateEachFn,
MultiIndexBlock::kNoopOnCommitFn);
if (!status.isOK()) {
@@ -322,7 +324,7 @@ Status CollectionBulkLoaderImpl::commit() {
// _releaseResources.
_idIndexBlock.reset();
_secondaryIndexesBlock.reset();
- _autoColl.reset();
+ _collection.reset();
return Status::OK();
});
}
@@ -330,19 +332,20 @@ Status CollectionBulkLoaderImpl::commit() {
void CollectionBulkLoaderImpl::_releaseResources() {
invariant(&cc() == _opCtx->getClient());
if (_secondaryIndexesBlock) {
+ CollectionWriter collWriter(*_collection);
_secondaryIndexesBlock->abortIndexBuild(
- _opCtx.get(), _collection, MultiIndexBlock::kNoopOnCleanUpFn);
+ _opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn);
_secondaryIndexesBlock.reset();
}
if (_idIndexBlock) {
- _idIndexBlock->abortIndexBuild(
- _opCtx.get(), _collection, MultiIndexBlock::kNoopOnCleanUpFn);
+ CollectionWriter collWriter(*_collection);
+ _idIndexBlock->abortIndexBuild(_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn);
_idIndexBlock.reset();
}
// release locks.
- _autoColl.reset();
+ _collection.reset();
}
template <typename F>
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index afb6df03bc2..fab8ed9c922 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -104,8 +104,7 @@ private:
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
- std::unique_ptr<AutoGetCollection> _autoColl;
- Collection* _collection;
+ std::unique_ptr<AutoGetCollection> _collection;
NamespaceString _nss;
std::unique_ptr<MultiIndexBlock> _idIndexBlock;
std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock;
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index c0c242421f9..ecaf6fb104e 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -650,8 +650,8 @@ void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx,
const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace);
AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(opCtx, oplogNss, MODE_X);
- Collection* oplogCollection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespaceForMetadataWrite(opCtx, oplogNss);
+ auto oplogCollection =
+ CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, oplogNss);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(
34418,
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 6c3480157e9..fa6b07c6696 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -870,7 +870,7 @@ void dropIndex(OperationContext* opCtx,
"namespace"_attr = nss.toString());
return;
}
- WriteUnitOfWork wunit(opCtx);
+
auto entry = indexCatalog->getEntry(indexDescriptor);
if (entry->isReady(opCtx)) {
auto status = indexCatalog->dropIndex(opCtx, indexDescriptor);
@@ -894,7 +894,6 @@ void dropIndex(OperationContext* opCtx,
"error"_attr = redact(status));
}
}
- wunit.commit();
}
/**
@@ -907,8 +906,7 @@ void rollbackCreateIndexes(OperationContext* opCtx, UUID uuid, std::set<std::str
CollectionCatalog::get(opCtx).lookupNSSByUUID(opCtx, uuid);
invariant(nss);
Lock::DBLock dbLock(opCtx, nss->db(), MODE_X);
- Collection* collection =
- CollectionCatalog::get(opCtx).lookupCollectionByUUIDForMetadataWrite(opCtx, uuid);
+ CollectionWriter collection(opCtx, uuid);
// If we cannot find the collection, we skip over dropping the index.
if (!collection) {
@@ -946,7 +944,9 @@ void rollbackCreateIndexes(OperationContext* opCtx, UUID uuid, std::set<std::str
"uuid"_attr = uuid,
"indexName"_attr = indexName);
- dropIndex(opCtx, indexCatalog, indexName, *nss);
+ WriteUnitOfWork wuow(opCtx);
+ dropIndex(opCtx, collection.getWritableCollection()->getIndexCatalog(), indexName, *nss);
+ wuow.commit();
LOGV2_DEBUG(21673,
1,
@@ -1574,8 +1574,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
auto db = databaseHolder->openDb(opCtx, nss->db().toString());
invariant(db);
- Collection* collection =
- CollectionCatalog::get(opCtx).lookupCollectionByUUIDForMetadataWrite(opCtx, uuid);
+ CollectionWriter collection(opCtx, uuid);
invariant(collection);
auto infoResult = rollbackSource.getCollectionInfoByUUID(nss->db().toString(), uuid);
@@ -1627,7 +1626,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// Set any document validation options. We update the validator fields without
// parsing/validation, since we fetched the options object directly from the sync
// source, and we should set our validation options to match it exactly.
- auto validatorStatus = collection->updateValidator(
+ auto validatorStatus = collection.getWritableCollection()->updateValidator(
opCtx, options.validator, options.validationLevel, options.validationAction);
if (!validatorStatus.isOK()) {
throw RSFatalException(str::stream()
@@ -1729,8 +1728,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
const NamespaceString docNss(doc.ns);
Lock::DBLock docDbLock(opCtx, docNss.db(), MODE_X);
OldClientContext ctx(opCtx, doc.ns.toString());
- Collection* collection =
- catalog.lookupCollectionByUUIDForMetadataWrite(opCtx, uuid);
+ CollectionWriter collection(opCtx, uuid);
// Adds the doc to our rollback file if the collection was not dropped while
// rolling back createCollection operations. Does not log an error when
@@ -1740,7 +1738,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
if (collection && removeSaver) {
BSONObj obj;
- bool found = Helpers::findOne(opCtx, collection, pattern, obj, false);
+ bool found = Helpers::findOne(opCtx, collection.get(), pattern, obj, false);
if (found) {
auto status = removeSaver->goingToDelete(obj);
if (!status.isOK()) {
@@ -1791,7 +1789,8 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
const auto clock = opCtx->getServiceContext()->getFastClockSource();
const auto findOneStart = clock->now();
- RecordId loc = Helpers::findOne(opCtx, collection, pattern, false);
+ RecordId loc =
+ Helpers::findOne(opCtx, collection.get(), pattern, false);
if (clock->now() - findOneStart > Milliseconds(200))
LOGV2_WARNING(
21726,
@@ -1807,8 +1806,9 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
collection->ns().ns(),
[&] {
WriteUnitOfWork wunit(opCtx);
- collection->cappedTruncateAfter(
- opCtx, loc, true);
+ collection.getWritableCollection()
+ ->cappedTruncateAfter(
+ opCtx, loc, true);
wunit.commit();
});
} catch (const DBException& e) {
@@ -1817,7 +1817,9 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
writeConflictRetry(
opCtx, "truncate", collection->ns().ns(), [&] {
WriteUnitOfWork wunit(opCtx);
- uassertStatusOK(collection->truncate(opCtx));
+ uassertStatusOK(
+ collection.getWritableCollection()
+ ->truncate(opCtx));
wunit.commit();
});
} else {
@@ -1843,7 +1845,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
}
} else {
deleteObjects(opCtx,
- collection,
+ collection.get(),
*nss,
pattern,
true, // justOne
@@ -1947,9 +1949,8 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
Lock::DBLock oplogDbLock(opCtx, oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(opCtx, oplogNss, MODE_X);
OldClientContext ctx(opCtx, oplogNss.ns());
- Collection* oplogCollection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespaceForMetadataWrite(opCtx,
- oplogNss);
+ auto oplogCollection =
+ CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, oplogNss);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(
40495,