diff options
Diffstat (limited to 'src/mongo/db/repl/storage_interface_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 196 |
1 files changed, 102 insertions, 94 deletions
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index ce03ab1629c..ff5aa7d4260 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -102,67 +102,67 @@ NamespaceString StorageInterfaceImpl::getMinValidNss() const { return _minValidNss; } -BSONObj StorageInterfaceImpl::getMinValidDocument(OperationContext* txn) const { +BSONObj StorageInterfaceImpl::getMinValidDocument(OperationContext* opCtx) const { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IS); - Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_IS); - Lock::CollectionLock lk(txn->lockState(), _minValidNss.ns(), MODE_IS); + ScopedTransaction transaction(opCtx, MODE_IS); + Lock::DBLock dblk(opCtx->lockState(), _minValidNss.db(), MODE_IS); + Lock::CollectionLock lk(opCtx->lockState(), _minValidNss.ns(), MODE_IS); BSONObj doc; - bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), doc); + bool found = Helpers::getSingleton(opCtx, _minValidNss.ns().c_str(), doc); invariant(found || doc.isEmpty()); return doc; } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::getMinValidDocument", _minValidNss.ns()); + opCtx, "StorageInterfaceImpl::getMinValidDocument", _minValidNss.ns()); MONGO_UNREACHABLE; } -void StorageInterfaceImpl::updateMinValidDocument(OperationContext* txn, +void StorageInterfaceImpl::updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); + ScopedTransaction transaction(opCtx, MODE_IX); // For now this needs to be MODE_X because it sometimes creates the collection. - Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X); - Helpers::putSingleton(txn, _minValidNss.ns().c_str(), updateSpec); + Lock::DBLock dblk(opCtx->lockState(), _minValidNss.db(), MODE_X); + Helpers::putSingleton(opCtx, _minValidNss.ns().c_str(), updateSpec); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns()); + opCtx, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns()); } -bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* txn) const { - const BSONObj doc = getMinValidDocument(txn); +bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* opCtx) const { + const BSONObj doc = getMinValidDocument(opCtx); const auto flag = doc[kInitialSyncFlagFieldName].trueValue(); LOG(3) << "returning initial sync flag value of " << flag; return flag; } -void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* txn) { +void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* opCtx) { LOG(3) << "setting initial sync flag"; - updateMinValidDocument(txn, BSON("$set" << kInitialSyncFlag)); - txn->recoveryUnit()->waitUntilDurable(); + updateMinValidDocument(opCtx, BSON("$set" << kInitialSyncFlag)); + opCtx->recoveryUnit()->waitUntilDurable(); } -void StorageInterfaceImpl::clearInitialSyncFlag(OperationContext* txn) { +void StorageInterfaceImpl::clearInitialSyncFlag(OperationContext* opCtx) { LOG(3) << "clearing initial sync flag"; - auto replCoord = repl::ReplicationCoordinator::get(txn); + auto replCoord = repl::ReplicationCoordinator::get(opCtx); OpTime time = replCoord->getMyLastAppliedOpTime(); updateMinValidDocument( - txn, + opCtx, BSON("$unset" << kInitialSyncFlag << "$set" << BSON("ts" << time.getTimestamp() << "t" << time.getTerm() << kBeginFieldName << time.toBSON()))); if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) { - txn->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(); replCoord->setMyLastDurableOpTime(time); } } -OpTime StorageInterfaceImpl::getMinValid(OperationContext* txn) const { - const BSONObj doc = getMinValidDocument(txn); +OpTime StorageInterfaceImpl::getMinValid(OperationContext* opCtx) const { + const BSONObj doc = getMinValidDocument(opCtx); const auto opTimeStatus = OpTime::parseFromOplogEntry(doc); // If any of the keys (fields) are missing from the minvalid document, we return // a null OpTime. @@ -182,28 +182,29 @@ OpTime StorageInterfaceImpl::getMinValid(OperationContext* txn) const { return minValid; } -void StorageInterfaceImpl::setMinValid(OperationContext* txn, const OpTime& minValid) { +void StorageInterfaceImpl::setMinValid(OperationContext* opCtx, const OpTime& minValid) { LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON() << ")"; updateMinValidDocument( - txn, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); + opCtx, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); } -void StorageInterfaceImpl::setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) { +void StorageInterfaceImpl::setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) { LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON() << ")"; updateMinValidDocument( - txn, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); + opCtx, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); } -void StorageInterfaceImpl::setOplogDeleteFromPoint(OperationContext* txn, +void StorageInterfaceImpl::setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) { LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty(); - updateMinValidDocument(txn, BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp))); + updateMinValidDocument(opCtx, + BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp))); } -Timestamp StorageInterfaceImpl::getOplogDeleteFromPoint(OperationContext* txn) { - const BSONObj doc = getMinValidDocument(txn); +Timestamp StorageInterfaceImpl::getOplogDeleteFromPoint(OperationContext* opCtx) { + const BSONObj doc = getMinValidDocument(opCtx); Timestamp out = {}; if (auto field = doc[kOplogDeleteFromPointFieldName]) { out = field.timestamp(); @@ -213,17 +214,17 @@ Timestamp StorageInterfaceImpl::getOplogDeleteFromPoint(OperationContext* txn) { return out; } -void StorageInterfaceImpl::setAppliedThrough(OperationContext* txn, const OpTime& optime) { +void StorageInterfaceImpl::setAppliedThrough(OperationContext* opCtx, const OpTime& optime) { LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")"; if (optime.isNull()) { - updateMinValidDocument(txn, BSON("$unset" << BSON(kBeginFieldName << 1))); + updateMinValidDocument(opCtx, BSON("$unset" << BSON(kBeginFieldName << 1))); } else { - updateMinValidDocument(txn, BSON("$set" << BSON(kBeginFieldName << optime.toBSON()))); + updateMinValidDocument(opCtx, BSON("$set" << BSON(kBeginFieldName << optime.toBSON()))); } } -OpTime StorageInterfaceImpl::getAppliedThrough(OperationContext* txn) { - const BSONObj doc = getMinValidDocument(txn); +OpTime StorageInterfaceImpl::getAppliedThrough(OperationContext* opCtx) { + const BSONObj doc = getMinValidDocument(opCtx); const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName)); if (!opTimeStatus.isOK()) { // Return null OpTime on any parse failure, including if "begin" is missing. @@ -253,18 +254,18 @@ StorageInterfaceImpl::createCollectionForBulkLoading( std::unique_ptr<CollectionBulkLoader> loaderToReturn; Collection* collection; - auto status = runner->runSynchronousTask([&](OperationContext* txn) -> Status { + auto status = runner->runSynchronousTask([&](OperationContext* opCtx) -> Status { // We are not replicating nor validating writes under this OperationContext*. // The OperationContext* is used for all writes to the (newly) cloned collection. - txn->setReplicatedWrites(false); - documentValidationDisabled(txn) = true; + opCtx->setReplicatedWrites(false); + documentValidationDisabled(opCtx) = true; // Retry if WCE. MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // Get locks and create the collection. - ScopedTransaction transaction(txn, MODE_IX); - auto db = stdx::make_unique<AutoGetOrCreateDb>(txn, nss.db(), MODE_IX); - auto coll = stdx::make_unique<AutoGetCollection>(txn, nss, MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + auto db = stdx::make_unique<AutoGetOrCreateDb>(opCtx, nss.db(), MODE_IX); + auto coll = stdx::make_unique<AutoGetCollection>(opCtx, nss, MODE_X); collection = coll->getCollection(); if (collection) { @@ -272,14 +273,14 @@ StorageInterfaceImpl::createCollectionForBulkLoading( } // Create the collection. - WriteUnitOfWork wunit(txn); - collection = db->getDb()->createCollection(txn, nss.ns(), options, false); + WriteUnitOfWork wunit(opCtx); + collection = db->getDb()->createCollection(opCtx, nss.ns(), options, false); invariant(collection); wunit.commit(); - coll = stdx::make_unique<AutoGetCollection>(txn, nss, MODE_IX); + coll = stdx::make_unique<AutoGetCollection>(opCtx, nss, MODE_IX); // Move locks into loader, so it now controls their lifetime. - auto loader = stdx::make_unique<CollectionBulkLoaderImpl>(txn, + auto loader = stdx::make_unique<CollectionBulkLoaderImpl>(opCtx, collection, idIndexSpec, std::move(threadPool), @@ -291,7 +292,7 @@ StorageInterfaceImpl::createCollectionForBulkLoading( loaderToReturn = std::move(loader); return Status::OK(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "beginCollectionClone", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "beginCollectionClone", nss.ns()); MONGO_UNREACHABLE; }); @@ -308,20 +309,20 @@ StorageInterfaceImpl::createCollectionForBulkLoading( } -Status StorageInterfaceImpl::insertDocument(OperationContext* txn, +Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { - return insertDocuments(txn, nss, {doc}); + return insertDocuments(opCtx, nss, {doc}); } namespace { -Status insertDocumentsSingleBatch(OperationContext* txn, +Status insertDocumentsSingleBatch(OperationContext* opCtx, const NamespaceString& nss, std::vector<BSONObj>::const_iterator begin, std::vector<BSONObj>::const_iterator end) { - ScopedTransaction transaction(txn, MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX); + ScopedTransaction transaction(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, nss, MODE_IX); auto collection = autoColl.getCollection(); if (!collection) { return {ErrorCodes::NamespaceNotFound, @@ -329,9 +330,9 @@ Status insertDocumentsSingleBatch(OperationContext* txn, << nss.ns()}; } - WriteUnitOfWork wunit(txn); + WriteUnitOfWork wunit(opCtx); OpDebug* const nullOpDebug = nullptr; - auto status = collection->insertDocuments(txn, begin, end, nullOpDebug, false); + auto status = collection->insertDocuments(opCtx, begin, end, nullOpDebug, false); if (!status.isOK()) { return status; } @@ -342,12 +343,12 @@ Status insertDocumentsSingleBatch(OperationContext* txn, } // namespace -Status StorageInterfaceImpl::insertDocuments(OperationContext* txn, +Status StorageInterfaceImpl::insertDocuments(OperationContext* opCtx, const NamespaceString& nss, const std::vector<BSONObj>& docs) { if (docs.size() > 1U) { try { - if (insertDocumentsSingleBatch(txn, nss, docs.cbegin(), docs.cend()).isOK()) { + if (insertDocumentsSingleBatch(opCtx, nss, docs.cbegin(), docs.cend()).isOK()) { return Status::OK(); } } catch (...) { @@ -359,83 +360,84 @@ Status StorageInterfaceImpl::insertDocuments(OperationContext* txn, // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting. for (auto it = docs.cbegin(); it != docs.cend(); ++it) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - auto status = insertDocumentsSingleBatch(txn, nss, it, it + 1); + auto status = insertDocumentsSingleBatch(opCtx, nss, it, it + 1); if (!status.isOK()) { return status; } } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::insertDocuments", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + opCtx, "StorageInterfaceImpl::insertDocuments", nss.ns()); } return Status::OK(); } -Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* txn) { - dropAllDatabasesExceptLocal(txn); +Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* opCtx) { + dropAllDatabasesExceptLocal(opCtx); return Status::OK(); } -Status StorageInterfaceImpl::createOplog(OperationContext* txn, const NamespaceString& nss) { - mongo::repl::createOplog(txn, nss.ns(), true); +Status StorageInterfaceImpl::createOplog(OperationContext* opCtx, const NamespaceString& nss) { + mongo::repl::createOplog(opCtx, nss.ns(), true); return Status::OK(); } -StatusWith<size_t> StorageInterfaceImpl::getOplogMaxSize(OperationContext* txn, +StatusWith<size_t> StorageInterfaceImpl::getOplogMaxSize(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForRead collection(txn, nss); + AutoGetCollectionForRead collection(opCtx, nss); if (!collection.getCollection()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "Your oplog doesn't exist: " << nss.ns()}; } - const auto options = collection.getCollection()->getCatalogEntry()->getCollectionOptions(txn); + const auto options = collection.getCollection()->getCatalogEntry()->getCollectionOptions(opCtx); if (!options.capped) return {ErrorCodes::BadValue, str::stream() << nss.ns() << " isn't capped"}; return options.cappedSize; } -Status StorageInterfaceImpl::createCollection(OperationContext* txn, +Status StorageInterfaceImpl::createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - AutoGetOrCreateDb databaseWriteGuard(txn, nss.db(), MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + AutoGetOrCreateDb databaseWriteGuard(opCtx, nss.db(), MODE_X); auto db = databaseWriteGuard.getDb(); invariant(db); if (db->getCollection(nss)) { return {ErrorCodes::NamespaceExists, str::stream() << "Collection " << nss.ns() << " already exists."}; } - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); try { - auto coll = db->createCollection(txn, nss.ns(), options); + auto coll = db->createCollection(opCtx, nss.ns(), options); invariant(coll); } catch (const UserException& ex) { return ex.toStatus(); } wuow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::createCollection", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "StorageInterfaceImpl::createCollection", nss.ns()); return Status::OK(); } -Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const NamespaceString& nss) { +Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - AutoGetDb autoDB(txn, nss.db(), MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + AutoGetDb autoDB(opCtx, nss.db(), MODE_X); if (!autoDB.getDb()) { // Database does not exist - nothing to do. return Status::OK(); } - WriteUnitOfWork wunit(txn); - const auto status = autoDB.getDb()->dropCollection(txn, nss.ns()); + WriteUnitOfWork wunit(opCtx); + const auto status = autoDB.getDb()->dropCollection(opCtx, nss.ns()); if (status.isOK()) { wunit.commit(); } return status; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::dropCollection", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns()); } namespace { @@ -455,7 +457,7 @@ DeleteStageParams makeDeleteStageParamsForDeleteDocuments() { */ enum class FindDeleteMode { kFind, kDelete }; StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( - OperationContext* txn, + OperationContext* opCtx, const NamespaceString& nss, boost::optional<StringData> indexName, StorageInterface::ScanDirection scanDirection, @@ -468,8 +470,8 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { auto collectionAccessMode = isFind ? MODE_IS : MODE_IX; - ScopedTransaction transaction(txn, collectionAccessMode); - AutoGetCollection collectionGuard(txn, nss, collectionAccessMode); + ScopedTransaction transaction(opCtx, collectionAccessMode); + AutoGetCollection collectionGuard(opCtx, nss, collectionAccessMode); auto collection = collectionGuard.getCollection(); if (!collection) { return {ErrorCodes::NamespaceNotFound, @@ -493,9 +495,9 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( // Use collection scan. planExecutor = isFind ? InternalPlanner::collectionScan( - txn, nss.ns(), collection, PlanExecutor::YIELD_MANUAL, direction) + opCtx, nss.ns(), collection, PlanExecutor::YIELD_MANUAL, direction) : InternalPlanner::deleteWithCollectionScan( - txn, + opCtx, collection, makeDeleteStageParamsForDeleteDocuments(), PlanExecutor::YIELD_MANUAL, @@ -506,7 +508,7 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( invariant(indexCatalog); bool includeUnfinishedIndexes = false; IndexDescriptor* indexDescriptor = - indexCatalog->findIndexByName(txn, *indexName, includeUnfinishedIndexes); + indexCatalog->findIndexByName(opCtx, *indexName, includeUnfinishedIndexes); if (!indexDescriptor) { return {ErrorCodes::IndexNotFound, str::stream() << "Index not found, ns:" << nss.ns() << ", index: " @@ -529,7 +531,7 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( bounds.first = startKey; } planExecutor = isFind - ? InternalPlanner::indexScan(txn, + ? InternalPlanner::indexScan(opCtx, collection, indexDescriptor, bounds.first, @@ -538,7 +540,7 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( PlanExecutor::YIELD_MANUAL, direction, InternalPlanner::IXSCAN_FETCH) - : InternalPlanner::deleteWithIndexScan(txn, + : InternalPlanner::deleteWithIndexScan(opCtx, collection, makeDeleteStageParamsForDeleteDocuments(), indexDescriptor, @@ -562,33 +564,39 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( } return docs; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, opStr, nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, opStr, nss.ns()); MONGO_UNREACHABLE; } } // namespace StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::findDocuments( - OperationContext* txn, + OperationContext* opCtx, const NamespaceString& nss, boost::optional<StringData> indexName, ScanDirection scanDirection, const BSONObj& startKey, BoundInclusion boundInclusion, std::size_t limit) { - return _findOrDeleteDocuments( - txn, nss, indexName, scanDirection, startKey, boundInclusion, limit, FindDeleteMode::kFind); + return _findOrDeleteDocuments(opCtx, + nss, + indexName, + scanDirection, + startKey, + boundInclusion, + limit, + FindDeleteMode::kFind); } StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::deleteDocuments( - OperationContext* txn, + OperationContext* opCtx, const NamespaceString& nss, boost::optional<StringData> indexName, ScanDirection scanDirection, const BSONObj& startKey, BoundInclusion boundInclusion, std::size_t limit) { - return _findOrDeleteDocuments(txn, + return _findOrDeleteDocuments(opCtx, nss, indexName, scanDirection, @@ -598,10 +606,10 @@ StatusWith<std::vector<BSONObj>> StorageInterfaceImpl::deleteDocuments( FindDeleteMode::kDelete); } -Status StorageInterfaceImpl::isAdminDbValid(OperationContext* txn) { - ScopedTransaction transaction(txn, MODE_IX); - AutoGetDb autoDB(txn, "admin", MODE_X); - return checkAdminDatabase(txn, autoDB.getDb()); +Status StorageInterfaceImpl::isAdminDbValid(OperationContext* opCtx) { + ScopedTransaction transaction(opCtx, MODE_IX); + AutoGetDb autoDB(opCtx, "admin", MODE_X); + return checkAdminDatabase(opCtx, autoDB.getDb()); } } // namespace repl |