/** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/sync_tail.h" #include "third_party/murmurhash3/MurmurHash3.h" #include #include #include "mongo/base/counter.h" #include "mongo/bson/bsonelement_comparator.h" #include "mongo/bson/timestamp.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/namespace_string.h" #include "mongo/db/prefetch.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/applier_helpers.h" #include "mongo/db/repl/apply_ops.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/initial_syncer.h" #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/session_update_tracker.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/scopeguard.h" namespace mongo { using std::endl; namespace repl { AtomicInt32 SyncTail::replBatchLimitOperations{50 * 1000}; namespace { MONGO_FP_DECLARE(pauseBatchApplicationBeforeCompletion); /** * This variable determines the number of writer threads SyncTail will have. It can be overridden * using the "replWriterThreadCount" server parameter. */ int replWriterThreadCount = 16; class ExportedWriterThreadCountParameter : public ExportedServerParameter { public: ExportedWriterThreadCountParameter() : ExportedServerParameter( ServerParameterSet::getGlobal(), "replWriterThreadCount", &replWriterThreadCount) {} virtual Status validate(const int& potentialNewValue) { if (potentialNewValue < 1 || potentialNewValue > 256) { return Status(ErrorCodes::BadValue, "replWriterThreadCount must be between 1 and 256"); } return Status::OK(); } } exportedWriterThreadCountParam; class ExportedBatchLimitOperationsParameter : public ExportedServerParameter { public: ExportedBatchLimitOperationsParameter() : ExportedServerParameter( ServerParameterSet::getGlobal(), "replBatchLimitOperations", &SyncTail::replBatchLimitOperations) {} virtual Status validate(const int& potentialNewValue) { if (potentialNewValue < 1 || potentialNewValue > (1000 * 1000)) { return Status(ErrorCodes::BadValue, "replBatchLimitOperations must be between 1 and 1 million, inclusive"); } return Status::OK(); } } exportedBatchLimitOperationsParam; // The oplog entries applied Counter64 opsAppliedStats; ServerStatusMetricField displayOpsApplied("repl.apply.ops", &opsAppliedStats); // Number of times we tried to go live as a secondary. Counter64 attemptsToBecomeSecondary; ServerStatusMetricField displayAttemptsToBecomeSecondary( "repl.apply.attemptsToBecomeSecondary", &attemptsToBecomeSecondary); // Number and time of each ApplyOps worker pool round TimerStats applyBatchStats; ServerStatusMetricField displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); class ApplyBatchFinalizer { public: ApplyBatchFinalizer(ReplicationCoordinator* replCoord) : _replCoord(replCoord) {} virtual ~ApplyBatchFinalizer(){}; virtual void record(const OpTime& newOpTime, ReplicationCoordinator::DataConsistency consistency) { _recordApplied(newOpTime, consistency); }; protected: void _recordApplied(const OpTime& newOpTime, ReplicationCoordinator::DataConsistency consistency) { // We have to use setMyLastAppliedOpTimeForward since this thread races with // ReplicationExternalStateImpl::onTransitionToPrimary. _replCoord->setMyLastAppliedOpTimeForward(newOpTime, consistency); } void _recordDurable(const OpTime& newOpTime) { // We have to use setMyLastDurableOpTimeForward since this thread races with // ReplicationExternalStateImpl::onTransitionToPrimary. _replCoord->setMyLastDurableOpTimeForward(newOpTime); } private: // Used to update the replication system's progress. ReplicationCoordinator* _replCoord; }; class ApplyBatchFinalizerForJournal : public ApplyBatchFinalizer { public: ApplyBatchFinalizerForJournal(ReplicationCoordinator* replCoord) : ApplyBatchFinalizer(replCoord), _waiterThread{&ApplyBatchFinalizerForJournal::_run, this} {}; ~ApplyBatchFinalizerForJournal(); void record(const OpTime& newOpTime, ReplicationCoordinator::DataConsistency consistency) override; private: /** * Loops continuously, waiting for writes to be flushed to disk and then calls * ReplicationCoordinator::setMyLastOptime with _latestOpTime. * Terminates once _shutdownSignaled is set true. */ void _run(); // Protects _cond, _shutdownSignaled, and _latestOpTime. stdx::mutex _mutex; // Used to alert our thread of a new OpTime. stdx::condition_variable _cond; // The next OpTime to set as the ReplicationCoordinator's lastOpTime after flushing. OpTime _latestOpTime; // Once this is set to true the _run method will terminate. bool _shutdownSignaled = false; // Thread that will _run(). Must be initialized last as it depends on the other variables. stdx::thread _waiterThread; }; ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { stdx::unique_lock lock(_mutex); _shutdownSignaled = true; _cond.notify_all(); lock.unlock(); _waiterThread.join(); } void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime, ReplicationCoordinator::DataConsistency consistency) { _recordApplied(newOpTime, consistency); stdx::unique_lock lock(_mutex); _latestOpTime = newOpTime; _cond.notify_all(); } void ApplyBatchFinalizerForJournal::_run() { Client::initThread("ApplyBatchFinalizerForJournal"); while (true) { OpTime latestOpTime; { stdx::unique_lock lock(_mutex); while (_latestOpTime.isNull() && !_shutdownSignaled) { _cond.wait(lock); } if (_shutdownSignaled) { return; } latestOpTime = _latestOpTime; _latestOpTime = OpTime(); } auto opCtx = cc().makeOperationContext(); opCtx->recoveryUnit()->waitUntilDurable(); _recordDurable(latestOpTime); } } NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); if (!optionalUuid) { return oplogEntry.getNamespace(); } const auto& uuid = optionalUuid.get(); auto& catalog = UUIDCatalog::get(opCtx); auto nss = catalog.lookupNSSByUUID(uuid); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "No namespace with UUID " << uuid.toString(), !nss.isEmpty()); return nss; } NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op) { if (auto ui = op["ui"]) { return {nss.db().toString(), uassertStatusOK(UUID::parse(ui))}; } return nss; } } // namespace std::size_t SyncTail::calculateBatchLimitBytes(OperationContext* opCtx, StorageInterface* storageInterface) { auto oplogMaxSizeResult = storageInterface->getOplogMaxSize(opCtx, NamespaceString::kRsOplogNamespace); auto oplogMaxSize = fassert(40301, oplogMaxSizeResult); return std::min(oplogMaxSize / 10, std::size_t(replBatchLimitBytes)); } std::unique_ptr SyncTail::makeWriterPool() { return makeWriterPool(replWriterThreadCount); } std::unique_ptr SyncTail::makeWriterPool(int threadCount) { ThreadPool::Options options; options.threadNamePrefix = "repl writer worker "; options.poolName = "repl writer worker Pool"; options.maxThreads = options.minThreads = static_cast(threadCount); options.onCreateThread = [](const std::string&) { // Only do this once per thread if (!Client::getCurrent()) { Client::initThreadIfNotAlready(); AuthorizationSession::get(cc())->grantInternalAuthorization(); } }; auto pool = stdx::make_unique(options); pool->startup(); return pool; } // static Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) { // Count each log op application as a separate operation, for reporting purposes CurOp individualOp(opCtx); const NamespaceString nss(op.getStringField("ns")); auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); }; auto applyOp = [&](Database* db) { // For non-initial-sync, we convert updates to upserts // to suppress errors when replaying oplog entries. UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); // We convert updates to upserts when not in initial sync because after rollback and during // startup we may replay an update after a delete and crash since we do not ignore // errors. In initial sync we simply ignore these update errors so there is no reason to // upsert. // // TODO (SERVER-21700): Never upsert during oplog application unless an external applyOps // wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP // mode (similar to initial sync) instead so we do not accidentally ignore real errors. bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync); Status status = applyOperation_inlock( opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } return status; }; auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe()); if (opType == OpTypeEnum::kNoop) { if (nss.db() == "") { return Status::OK(); } Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); OldClientContext ctx(opCtx, nss.ns()); return applyOp(ctx.db()); } else if (opType == OpTypeEnum::kInsert && nss.isSystemDotIndexes()) { Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); OldClientContext ctx(opCtx, nss.ns()); return applyOp(ctx.db()); } else if (OplogEntry::isCrudOpType(opType)) { return writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] { // Need to throw instead of returning a status for it to be properly ignored. try { AutoGetCollection autoColl(opCtx, getNsOrUUID(nss, op), MODE_IX); auto db = autoColl.getDb(); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "missing database (" << nss.db() << ")", db); OldClientContext ctx(opCtx, autoColl.getNss().ns(), db); return applyOp(ctx.db()); } catch (ExceptionFor& ex) { if (oplogApplicationMode == OplogApplication::Mode::kRecovering) { return Status::OK(); } // Delete operations on non-existent namespaces can be treated as successful for // idempotency reasons. // During RECOVERING mode, we ignore NamespaceNotFound for all CRUD ops since // storage does not wait for drops to be checkpointed (SERVER-33161). if (opType == OpTypeEnum::kDelete || oplogApplicationMode == OplogApplication::Mode::kRecovering) { return Status::OK(); } ex.addContext(str::stream() << "Failed to apply operation: " << redact(op)); throw; } }); } else if (opType == OpTypeEnum::kCommand) { return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] { // a command may need a global write lock. so we will conservatively go // ahead and grab one here. suboptimal. :-( Lock::GlobalWrite globalWriteLock(opCtx); // special case apply for commands to avoid implicit database creation Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode); incrementOpsAppliedStats(); return status; }); } MONGO_UNREACHABLE; } SyncTail::SyncTail(OplogApplier::Observer* observer, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, MultiSyncApplyFunc func, ThreadPool* writerPool, const OplogApplier::Options& options) : _observer(observer), _consistencyMarkers(consistencyMarkers), _storageInterface(storageInterface), _applyFunc(func), _writerPool(writerPool), _options(options) {} SyncTail::SyncTail(OplogApplier::Observer* observer, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, MultiSyncApplyFunc func, ThreadPool* writerPool) : SyncTail(observer, consistencyMarkers, storageInterface, func, writerPool, {}) {} SyncTail::~SyncTail() {} const OplogApplier::Options& SyncTail::getOptions() const { return _options; } namespace { // The pool threads call this to prefetch each op void prefetchOp(const OplogEntry& oplogEntry) { const auto& nss = oplogEntry.getNamespace(); if (!nss.isEmpty()) { try { // one possible tweak here would be to stay in the read lock for this database // for multiple prefetches if they are for the same database. const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; AutoGetCollectionForReadCommand ctx(&opCtx, nss); Database* db = ctx.getDb(); if (db) { prefetchPagesForReplicatedOp(&opCtx, db, oplogEntry); } } catch (const DBException& e) { LOG(2) << "ignoring exception in prefetchOp(): " << redact(e) << endl; } catch (const std::exception& e) { log() << "Unhandled std::exception in prefetchOp(): " << redact(e.what()) << endl; fassertFailed(16397); } } } // Doles out all the work to the reader pool threads and waits for them to complete void prefetchOps(const MultiApplier::Operations& ops, ThreadPool* prefetcherPool) { invariant(prefetcherPool); for (auto&& op : ops) { invariant(prefetcherPool->schedule([&] { prefetchOp(op); })); } prefetcherPool->waitForIdle(); } // Doles out all the work to the writer pool threads. // Does not modify writerVectors, but passes non-const pointers to inner vectors into func. void applyOps(std::vector& writerVectors, ThreadPool* writerPool, const SyncTail::MultiSyncApplyFunc& func, SyncTail* st, std::vector* statusVector, std::vector* workerMultikeyPathInfo) { invariant(writerVectors.size() == statusVector->size()); for (size_t i = 0; i < writerVectors.size(); i++) { if (!writerVectors[i].empty()) { invariant(writerPool->schedule([ &func, st, &writer = writerVectors.at(i), &status = statusVector->at(i), &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i) ] { auto opCtx = cc().makeOperationContext(); status = func(opCtx.get(), &writer, st, &workerMultikeyPathInfo); })); } } } // Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops' // stays valid until all scheduled work in the thread pool completes. void scheduleWritesToOplog(OperationContext* opCtx, StorageInterface* storageInterface, ThreadPool* threadPool, const MultiApplier::Operations& ops) { auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) { // The returned function will be run in a separate thread after this returns. Therefore all // captures other than 'ops' must be by value since they will not be available. The caller // guarantees that 'ops' will stay in scope until the spawned threads complete. return [storageInterface, &ops, begin, end] { auto opCtx = cc().makeOperationContext(); UnreplicatedWritesBlock uwb(opCtx.get()); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( opCtx->lockState()); std::vector docs; docs.reserve(end - begin); for (size_t i = begin; i < end; i++) { // Add as unowned BSON to avoid unnecessary ref-count bumps. // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed. docs.emplace_back(InsertStatement{ ops[i].raw, ops[i].getOpTime().getTimestamp(), ops[i].getOpTime().getTerm()}); } fassert(40141, storageInterface->insertDocuments( opCtx.get(), NamespaceString::kRsOplogNamespace, docs)); }; }; // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it // would result too little work per thread. This also ensures that we can amortize the // setup/teardown overhead across many writes. const size_t kMinOplogEntriesPerThread = 16; const bool enoughToMultiThread = ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().numThreads; // Only doc-locking engines support parallel writes to the oplog because they are required to // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally, // there would be no way to take advantage of multiple threads if a storage engine doesn't // support document locking. if (!enoughToMultiThread || !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) { invariant(threadPool->schedule(makeOplogWriterForRange(0, ops.size()))); return; } const size_t numOplogThreads = threadPool->getStats().numThreads; const size_t numOpsPerThread = ops.size() / numOplogThreads; for (size_t thread = 0; thread < numOplogThreads; thread++) { size_t begin = thread * numOpsPerThread; size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread; invariant(threadPool->schedule(makeOplogWriterForRange(begin, end))); } } /** * Caches per-collection properties which are relevant for oplog application, so that they don't * have to be retrieved repeatedly for each op. */ class CachedCollectionProperties { public: struct CollectionProperties { bool isCapped = false; const CollatorInterface* collator = nullptr; }; CollectionProperties getCollectionProperties(OperationContext* opCtx, const StringMapTraits::HashedKey& ns) { auto it = _cache.find(ns); if (it != _cache.end()) { return it->second; } auto collProperties = getCollectionPropertiesImpl(opCtx, ns.key()); _cache[ns] = collProperties; return collProperties; } private: CollectionProperties getCollectionPropertiesImpl(OperationContext* opCtx, StringData ns) { CollectionProperties collProperties; Lock::DBLock dbLock(opCtx, nsToDatabaseSubstring(ns), MODE_IS); auto db = DatabaseHolder::getDatabaseHolder().get(opCtx, ns); if (!db) { return collProperties; } auto collection = db->getCollection(opCtx, ns); if (!collection) { return collProperties; } collProperties.isCapped = collection->isCapped(); collProperties.collator = collection->getDefaultCollator(); return collProperties; } StringMap _cache; }; /** * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops * vector in any other way. * writerVectors - Set of operations for each worker thread to apply. * derivedOps - If provided, this function inserts a decomposition of applyOps operations * and instructions for updating the transactions table. * sessionUpdateTracker - if provided, keeps track of session info from ops. */ void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector* writerVectors, std::vector* derivedOps, SessionUpdateTracker* sessionUpdateTracker) { const auto serviceContext = opCtx->getServiceContext(); const auto storageEngine = serviceContext->getStorageEngine(); const bool supportsDocLocking = storageEngine->supportsDocLocking(); const uint32_t numWriters = writerVectors->size(); CachedCollectionProperties collPropertiesCache; for (auto&& op : *ops) { StringMapTraits::HashedKey hashedNs(op.getNamespace().ns()); uint32_t hash = hashedNs.hash(); // We need to track all types of ops, including type 'n' (these are generated from chunk // migrations). if (sessionUpdateTracker) { if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) { derivedOps->emplace_back(std::move(*newOplogWrites)); fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } } if (op.isCrudOpType()) { auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); // For doc locking engines, include the _id of the document in the hash so we get // parallelism even if all writes are to a single collection. // // For capped collections, this is illegal, since capped collections must preserve // insertion order. if (supportsDocLocking && !collProperties.isCapped) { BSONElement id = op.getIdElement(); BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore, collProperties.collator); const size_t idHash = elementHasher.hash(id); MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) { // Mark capped collection ops before storing them to ensure we do not attempt to // bulk insert them. op.isForCappedCollection = true; } } // Extract applyOps operations and fill writers with extracted operations using this // function. if (op.isCommand() && op.getCommandType() == OplogEntry::CommandType::kApplyOps) { try { derivedOps->emplace_back(ApplyOps::extractOperations(op)); fillWriterVectors( opCtx, &derivedOps->back(), writerVectors, derivedOps, sessionUpdateTracker); } catch (...) { fassertFailedWithStatusNoTrace( 50711, exceptionToStatus().withContext(str::stream() << "Unable to extract operations from applyOps " << redact(op.toBSON()))); } continue; } auto& writer = (*writerVectors)[hash % numWriters]; if (writer.empty()) { writer.reserve(8); // Skip a few growth rounds } writer.push_back(&op); } } void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector* writerVectors, std::vector* derivedOps) { SessionUpdateTracker sessionUpdateTracker; fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); auto newOplogWrites = sessionUpdateTracker.flushAll(); if (!newOplogWrites.empty()) { derivedOps->emplace_back(std::move(newOplogWrites)); fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } } } // namespace namespace { void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord, OpTime minValid) { if (replCoord->isInPrimaryOrSecondaryState()) { return; } // This needs to happen after the attempt so readers can be sure we've already tried. ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); }); // Need global X lock to transition to SECONDARY Lock::GlobalWrite writeLock(opCtx); // Maintenance mode will force us to remain in RECOVERING state, no matter what. if (replCoord->getMaintenanceMode()) { LOG(1) << "We cannot transition to SECONDARY state while in maintenance mode."; return; } // We can only transition to SECONDARY from RECOVERING state. MemberState state(replCoord->getMemberState()); if (!state.recovering()) { LOG(2) << "We cannot transition to SECONDARY state since we are not currently in " "RECOVERING state. Current state: " << state.toString(); return; } // We can't go to SECONDARY state until we reach 'minValid', since the database may be in an // inconsistent state before this point. If our state is inconsistent, we need to disallow reads // from clients, which is why we stay in RECOVERING state. auto lastApplied = replCoord->getMyLastAppliedOpTime(); if (lastApplied < minValid) { LOG(2) << "We cannot transition to SECONDARY state because our 'lastApplied' optime is " "less than the 'minValid' optime. minValid optime: " << minValid << ", lastApplied optime: " << lastApplied; return; } // Execute the transition to SECONDARY. auto status = replCoord->setFollowerMode(MemberState::RS_SECONDARY); if (!status.isOK()) { warning() << "Failed to transition into " << MemberState(MemberState::RS_SECONDARY) << ". Current state: " << replCoord->getMemberState() << causedBy(status); } } } class SyncTail::OpQueueBatcher { MONGO_DISALLOW_COPYING(OpQueueBatcher); public: OpQueueBatcher(SyncTail* syncTail, StorageInterface* storageInterface, OplogBuffer* oplogBuffer) : _syncTail(syncTail), _storageInterface(storageInterface), _oplogBuffer(oplogBuffer), _ops(0), _thread([this] { run(); }) {} ~OpQueueBatcher() { invariant(_isDead); _thread.join(); } OpQueue getNextBatch(Seconds maxWaitTime) { stdx::unique_lock lk(_mutex); if (_ops.empty() && !_ops.mustShutdown()) { // We intentionally don't care about whether this returns due to signaling or timeout // since we do the same thing either way: return whatever is in _ops. (void)_cv.wait_for(lk, maxWaitTime.toSystemDuration()); } OpQueue ops = std::move(_ops); _ops = OpQueue(0); _cv.notify_all(); return ops; } private: /** * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog * entries that can be be returned in a batch. */ boost::optional _calculateSlaveDelayLatestTimestamp() { auto service = cc().getServiceContext(); auto replCoord = ReplicationCoordinator::get(service); auto slaveDelay = replCoord->getSlaveDelaySecs(); if (slaveDelay <= Seconds(0)) { return {}; } auto fastClockSource = service->getFastClockSource(); return fastClockSource->now() - slaveDelay; } void run() { Client::initThread("ReplBatcher"); BatchLimits batchLimits; batchLimits.bytes = calculateBatchLimitBytes(cc().makeOperationContext().get(), _storageInterface); while (true) { batchLimits.slaveDelayLatestTimestamp = _calculateSlaveDelayLatestTimestamp(); // Check this once per batch since users can change it at runtime. batchLimits.ops = replBatchLimitOperations.load(); OpQueue ops(batchLimits.ops); // tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early. { auto opCtx = cc().makeOperationContext(); while (!_syncTail->tryPopAndWaitForMore( opCtx.get(), _oplogBuffer, &ops, batchLimits)) { } } if (ops.empty() && !ops.mustShutdown()) { continue; // Don't emit empty batches. } stdx::unique_lock lk(_mutex); // Block until the previous batch has been taken. _cv.wait(lk, [&] { return _ops.empty(); }); _ops = std::move(ops); _cv.notify_all(); if (_ops.mustShutdown()) { _isDead = true; return; } } } SyncTail* const _syncTail; StorageInterface* const _storageInterface; OplogBuffer* const _oplogBuffer; stdx::mutex _mutex; // Guards _ops. stdx::condition_variable _cv; OpQueue _ops; // This only exists so the destructor invariants rather than deadlocking. // TODO remove once we trust noexcept enough to mark oplogApplication() as noexcept. bool _isDead = false; stdx::thread _thread; // Must be last so all other members are initialized before starting. }; void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord) { if (isMMAPV1()) { // Overwrite prefetch index mode if ReplSettings has a mode set. auto&& replSettings = replCoord->getSettings(); if (replSettings.isPrefetchIndexModeSet()) { replCoord->setIndexPrefetchConfig(replSettings.getPrefetchIndexMode()); } } // We don't start data replication for arbiters at all and it's not allowed to reconfig // arbiterOnly field for any member. invariant(!replCoord->getMemberState().arbiter()); OpQueueBatcher batcher(this, _storageInterface, oplogBuffer); std::unique_ptr finalizer{ getGlobalServiceContext()->getStorageEngine()->isDurable() ? new ApplyBatchFinalizerForJournal(replCoord) : new ApplyBatchFinalizer(replCoord)}; // Get replication consistency markers. OpTime minValid; while (true) { // Exits on message from OpQueueBatcher. // Use a new operation context each iteration, as otherwise we may appear to use a single // collection name to refer to collections with different UUIDs. const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; // For pausing replication in tests. if (MONGO_FAIL_POINT(rsSyncApplyStop)) { log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is " "disabled."; while (MONGO_FAIL_POINT(rsSyncApplyStop)) { // Tests should not trigger clean shutdown while that failpoint is active. If we // think we need this, we need to think hard about what the behavior should be. if (inShutdown()) { severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; fassertFailedNoTrace(40304); } sleepmillis(10); } } // Get the current value of 'minValid'. minValid = _consistencyMarkers->getMinValid(&opCtx); // Transition to SECONDARY state, if possible. tryToGoLiveAsASecondary(&opCtx, replCoord, minValid); long long termWhenBufferIsEmpty = replCoord->getTerm(); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. OpQueue ops = batcher.getNextBatch(Seconds(1)); if (ops.empty()) { if (ops.mustShutdown()) { // Shut down and exit oplog application loop. return; } if (MONGO_FAIL_POINT(rsSyncApplyStop)) { continue; } // Signal drain complete if we're in Draining state and the buffer is empty. replCoord->signalDrainComplete(&opCtx, termWhenBufferIsEmpty); continue; // Try again. } // Extract some info from ops that we'll need after releasing the batch below. const auto firstOpTimeInBatch = ops.front().getOpTime(); const auto lastOpTimeInBatch = ops.back().getOpTime(); const auto lastAppliedOpTimeAtStartOfBatch = replCoord->getMyLastAppliedOpTime(); // Make sure the oplog doesn't go back in time or repeat an entry. if (firstOpTimeInBatch <= lastAppliedOpTimeAtStartOfBatch) { fassert(34361, Status(ErrorCodes::OplogOutOfOrder, str::stream() << "Attempted to apply an oplog entry (" << firstOpTimeInBatch.toString() << ") which is not greater than our last applied OpTime (" << lastAppliedOpTimeAtStartOfBatch.toString() << ").")); } // Don't allow the fsync+lock thread to see intermediate states of batch application. stdx::lock_guard fsynclk(filesLockedFsync); // Apply the operations in this batch. 'multiApply' returns the optime of the last op that // was applied, which should be the last optime in the batch. auto lastOpTimeAppliedInBatch = fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch())); invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); // In order to provide resilience in the event of a crash in the middle of batch // application, 'multiApply' will update 'minValid' so that it is at least as great as the // last optime that it applied in this batch. If 'minValid' was moved forward, we make sure // to update our view of it here. if (lastOpTimeInBatch > minValid) { minValid = lastOpTimeInBatch; } // Update various things that care about our last applied optime. Tests rely on 2 happening // before 3 even though it isn't strictly necessary. The order of 1 doesn't matter. // 1. Update the global timestamp. setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 2. Persist our "applied through" optime to disk. _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch); // 3. Ensure that the last applied op time hasn't changed since the start of this batch. const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime(); invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch, str::stream() << "the last known applied OpTime has changed from " << lastAppliedOpTimeAtStartOfBatch.toString() << " to " << lastAppliedOpTimeAtEndOfBatch.toString() << " in the middle of batch application"); // 4. Update oplog visibility by notifying the storage engine of the new oplog entries. const bool orderedCommit = true; _storageInterface->oplogDiskLocRegister( &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit); // 5. Finalize this batch. We are at a consistent optime if our current optime is >= the // current 'minValid' optime. auto consistency = (lastOpTimeInBatch >= minValid) ? ReplicationCoordinator::DataConsistency::Consistent : ReplicationCoordinator::DataConsistency::Inconsistent; finalizer->record(lastOpTimeInBatch, consistency); } } // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if // there are no further ops in the bgsync queue to read. // This function also blocks 1 second waiting for new ops to appear in the bgsync // queue. We don't block forever so that we can periodically check for things like shutdown or // reconfigs. bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, OplogBuffer* oplogBuffer, SyncTail::OpQueue* ops, const BatchLimits& limits) { { BSONObj op; // Check to see if there are ops waiting in the bgsync queue bool peek_success = oplogBuffer->peek(opCtx, &op); if (!peek_success) { // If we don't have anything in the queue, wait a bit for something to appear. if (ops->empty()) { if (inShutdown()) { ops->setMustShutdownFlag(); } else { // Block up to 1 second. We still return true in this case because we want this // op to be the first in a new batch with a new start time. oplogBuffer->waitForData(Seconds(1)); } } return true; } // If this op would put us over the byte limit don't include it unless the batch is empty. // We allow single-op batches to exceed the byte limit so that large ops are able to be // processed. if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) { return true; // Return before wasting time parsing the op. } // Don't consume the op if we are told to stop. if (MONGO_FAIL_POINT(rsSyncApplyStop)) { sleepmillis(10); return true; } ops->emplace_back(std::move(op)); // Parses the op in-place. } auto& entry = ops->back(); // check for oplog version change int curVersion = entry.getVersion(); if (curVersion != OplogEntry::kOplogVersion) { severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " << curVersion << " in oplog entry: " << redact(entry.toBSON()); fassertFailedNoTrace(18820); } auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs())); if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) { ops->pop_back(); // Don't do this op yet. if (ops->empty()) { // Sleep if we've got nothing to do. Only sleep for 1 second at a time to allow // reconfigs and shutdown to occur. sleepsecs(1); } return true; } // Commands must be processed one at a time. The only exception to this is applyOps because // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe // to batch applyOps commands with CRUD operations when reading from the oplog buffer. if (entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) { if (ops->getCount() == 1) { // apply commands one-at-a-time _consume(opCtx, oplogBuffer); } else { // This op must be processed alone, but we already had ops in the queue so we can't // include it in this batch. Since we didn't call consume(), we'll see this again next // time and process it alone. ops->pop_back(); } // Apply what we have so far. return true; } // We are going to apply this Op. _consume(opCtx, oplogBuffer); // Go back for more ops, unless we've hit the limit. return ops->getCount() >= limits.ops; } void SyncTail::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { // This is just to get the op off the queue; it's been peeked at and queued for application // already. BSONObj op; if (oplogBuffer->tryPop(opCtx, &op)) { _observer->onOperationConsumed(op); } else { invariant(inShutdown()); // This means that shutdown() was called between the consumer's calls to peek() and // consume(). shutdown() cleared the buffer so there is nothing for us to consume here. // Since our postcondition is already met, it is safe to return successfully. } } void SyncTail::shutdown() { stdx::lock_guard lock(_mutex); _inShutdown = true; } bool SyncTail::inShutdown() const { stdx::lock_guard lock(_mutex); return _inShutdown; } void SyncTail::setHostname(const std::string& hostname) { _hostname = hostname; } BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) { OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query? if (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) { log() << "initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled. " "Blocking until fail point is disabled."; while (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) { mongo::sleepsecs(1); } } const int retryMax = 3; for (int retryCount = 1; retryCount <= retryMax; ++retryCount) { if (retryCount != 1) { // if we are retrying, sleep a bit to let the network possibly recover sleepsecs(retryCount * retryCount); } try { bool ok = missingObjReader.connect(HostAndPort(_hostname)); if (!ok) { warning() << "network problem detected while connecting to the " << "sync source, attempt " << retryCount << " of " << retryMax << endl; continue; // try again } } catch (const NetworkException&) { warning() << "network problem detected while connecting to the " << "sync source, attempt " << retryCount << " of " << retryMax << endl; continue; // try again } // get _id from oplog entry to create query to fetch document. const auto idElem = oplogEntry.getIdElement(); if (idElem.eoo()) { severe() << "cannot fetch missing document without _id field: " << redact(oplogEntry.toBSON()); fassertFailedNoTrace(28742); } BSONObj query = BSONObjBuilder().append(idElem).obj(); BSONObj missingObj; auto nss = oplogEntry.getNamespace(); try { auto uuid = oplogEntry.getUuid(); if (!uuid) { missingObj = missingObjReader.findOne(nss.ns().c_str(), query); } else { auto dbname = nss.db(); // If a UUID exists for the command object, find the document by UUID. missingObj = missingObjReader.findOneByUUID(dbname.toString(), *uuid, query); } } catch (const NetworkException&) { warning() << "network problem detected while fetching a missing document from the " << "sync source, attempt " << retryCount << " of " << retryMax << endl; continue; // try again } catch (DBException& e) { error() << "assertion fetching missing object: " << redact(e) << endl; throw; } // success! return missingObj; } // retry count exceeded msgasserted(15916, str::stream() << "Can no longer connect to initial sync source: " << _hostname); } bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const OplogEntry& oplogEntry) { // Note that using the local UUID/NamespaceString mapping is sufficient for checking // whether the collection is capped on the remote because convertToCapped creates a // new collection with a different UUID. const NamespaceString nss(parseUUIDOrNs(opCtx, oplogEntry)); { // If the document is in a capped collection then it's okay for it to be missing. AutoGetCollectionForRead autoColl(opCtx, nss); Collection* const collection = autoColl.getCollection(); if (collection && collection->isCapped()) { log() << "Not fetching missing document in capped collection (" << nss << ")"; return false; } } log() << "Fetching missing document: " << redact(oplogEntry.toBSON()); BSONObj missingObj = getMissingDoc(opCtx, oplogEntry); if (missingObj.isEmpty()) { BSONObj object2; if (auto optionalObject2 = oplogEntry.getObject2()) { object2 = *optionalObject2; } log() << "Missing document not found on source; presumably deleted later in oplog. o first " "field: " << redact(oplogEntry.getObject()) << ", o2: " << redact(object2); return false; } return writeConflictRetry(opCtx, "fetchAndInsertMissingDocument", nss.ns(), [&] { // Take an X lock on the database in order to preclude other modifications. AutoGetDb autoDb(opCtx, nss.db(), MODE_X); Database* const db = autoDb.getDb(); WriteUnitOfWork wunit(opCtx); Collection* coll = nullptr; auto uuid = oplogEntry.getUuid(); if (!uuid) { if (!db) { return false; } coll = db->getOrCreateCollection(opCtx, nss); } else { // If the oplog entry has a UUID, use it to find the collection in which to insert the // missing document. auto& catalog = UUIDCatalog::get(opCtx); coll = catalog.lookupCollectionByUUID(*uuid); if (!coll) { // TODO(SERVER-30819) insert this UUID into the missing UUIDs set. return false; } } invariant(coll); OpDebug* const nullOpDebug = nullptr; Status status = coll->insertDocument(opCtx, InsertStatement(missingObj), nullOpDebug, true); uassert(15917, str::stream() << "Failed to insert missing document: " << status.toString(), status.isOK()); LOG(1) << "Inserted missing document: " << redact(missingObj); wunit.commit(); if (_observer) { const OplogApplier::Observer::FetchInfo fetchInfo(oplogEntry, missingObj); _observer->onMissingDocumentsFetchedAndInserted({fetchInfo}); } return true; }); } // This free function is used by the writer threads to apply each op Status multiSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { invariant(st); UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); ApplierHelpers::stableSortByNamespace(ops); // This function is only called in steady state replication and recovering. // Assume we are recovering if oplog writes are disabled in the options. const auto oplogApplicationMode = st->getOptions().skipWritesToOplog ? OplogApplication::Mode::kRecovering : OplogApplication::Mode::kSecondary; ApplierHelpers::InsertGroup insertGroup(ops, opCtx, oplogApplicationMode); { // Ensure that the MultikeyPathTracker stops tracking paths. ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); }); MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); for (auto it = ops->cbegin(); it != ops->cend(); ++it) { const auto& entry = **it; // If we are successful in grouping and applying inserts, advance the current iterator // past the end of the inserted group of entries. auto groupResult = insertGroup.groupAndApplyInserts(it); if (groupResult.isOK()) { it = groupResult.getValue(); continue; } // If we didn't create a group, try to apply the op individually. try { const Status status = SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode); if (!status.isOK()) { severe() << "Error applying operation (" << redact(entry.toBSON()) << "): " << causedBy(redact(status)); return status; } } catch (const DBException& e) { // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be // dropped before initial sync or recovery ends anyways and we should ignore it. if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() && st->getOptions().allowNamespaceNotFoundErrorsOnCrudOps) { continue; } severe() << "writer worker caught exception: " << redact(e) << " on: " << redact(entry.toBSON()); return e.toStatus(); } } } invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); invariant(workerMultikeyPathInfo->empty()); auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); if (!newPaths.empty()) { workerMultikeyPathInfo->swap(newPaths); } return Status::OK(); } Status multiInitialSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, WorkerMultikeyPathInfo* workerMultikeyPathInfo) { invariant(st); UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); { // Ensure that the MultikeyPathTracker stops tracking paths. ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); }); MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); for (auto it = ops->begin(); it != ops->end(); ++it) { auto& entry = **it; try { const Status s = SyncTail::syncApply(opCtx, entry.raw, OplogApplication::Mode::kInitialSync); if (!s.isOK()) { // In initial sync, update operations can cause documents to be missed during // collection cloning. As a result, it is possible that a document that we // need to update is not present locally. In that case we fetch the document // from the sync source. if (s != ErrorCodes::UpdateOperationFailed) { error() << "Error applying operation: " << redact(s) << " (" << redact(entry.toBSON()) << ")"; return s; } // We might need to fetch the missing docs from the sync source. st->fetchAndInsertMissingDocument(opCtx, entry); } } catch (const DBException& e) { // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be // dropped before initial sync ends anyways and we should ignore it. if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType()) { continue; } severe() << "writer worker caught exception: " << causedBy(redact(e)) << " on: " << redact(entry.toBSON()); return e.toStatus(); } } } invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); invariant(workerMultikeyPathInfo->empty()); auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); if (!newPaths.empty()) { workerMultikeyPathInfo->swap(newPaths); } return Status::OK(); } StatusWith SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { invariant(!ops.empty()); if (isMMAPV1()) { // Use a ThreadPool to prefetch all the operations in a batch. prefetchOps(ops, _writerPool); } LOG(2) << "replication batch size is " << ops.size(); // Stop all readers until we're done. This also prevents doc-locking engines from deleting old // entries from the oplog until we finish writing. Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); auto replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { severe() << "attempting to replicate ops while primary"; return {ErrorCodes::CannotApplyOplogWhilePrimary, "attempting to replicate ops while primary"}; } std::vector multikeyVector(_writerPool->getStats().numThreads); { // Each node records cumulative batch application stats for itself using this timer. TimerHolder timer(&applyBatchStats); // We must wait for the all work we've dispatched to complete before leaving this block // because the spawned threads refer to objects on the stack ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); }); // Write batch of ops into oplog. if (!_options.skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp()); scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops); } // Holds 'pseudo operations' generated by secondaries to aid in replication. // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied. // Pseudo operations include: // - applyOps operations expanded to individual ops. // - ops to update config.transactions. Normal writes to config.transactions in the // primary don't create an oplog entry, so extract info from writes with transactions // and create a pseudo oplog. std::vector derivedOps; std::vector writerVectors(_writerPool->getStats().numThreads); fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); // Reset consistency markers in case the node fails while applying ops. if (!_options.skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); } { std::vector statusVector(_writerPool->getStats().numThreads, Status::OK()); applyOps(writerVectors, _writerPool, _applyFunc, this, &statusVector, &multikeyVector); _writerPool->waitForIdle(); // If any of the statuses is not ok, return error. for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) { const auto& status = *it; if (!status.isOK()) { severe() << "Failed to apply batch of operations. Number of operations in batch: " << ops.size() << ". First operation: " << redact(ops.front().toBSON()) << ". Last operation: " << redact(ops.back().toBSON()) << ". Oplog application failed in writer thread " << std::distance(statusVector.cbegin(), it) << ": " << redact(status); return status; } } } // Notify the storage engine that a replication batch has completed. // This means that all the writes associated with the oplog entries in the batch are // finished and no new writes with timestamps associated with those oplog entries will show // up in the future. const auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); storageEngine->replicationBatchIsComplete(); } // Use this fail point to hold the PBWM lock and prevent the batch from completing. if (MONGO_FAIL_POINT(pauseBatchApplicationBeforeCompletion)) { log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail " "point is disabled."; while (MONGO_FAIL_POINT(pauseBatchApplicationBeforeCompletion)) { if (inShutdown()) { severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting " "clean shutdown"; fassertFailedNoTrace(50798); } sleepmillis(100); } } Timestamp firstTimeInBatch = ops.front().getTimestamp(); // Set any indexes to multikey that this batch ignored. This must be done while holding the // parallel batch writer mutex. for (WorkerMultikeyPathInfo infoVector : multikeyVector) { for (MultikeyPathInfo info : infoVector) { // We timestamp every multikey write with the first timestamp in the batch. It is always // safe to set an index as multikey too early, just not too late. We conservatively pick // the first timestamp in the batch since we do not have enough information to find out // the timestamp of the first write that set the given multikey path. fassert(50686, _storageInterface->setIndexIsMultikey( opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch)); } } // We have now written all database writes and updated the oplog to match. return ops.back().getOpTime(); } } // namespace repl } // namespace mongo