summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp252
1 files changed, 1 insertions, 251 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 18b3d7e0495..a5402fadfa7 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -85,23 +85,12 @@ namespace mongo {
namespace repl {
namespace {
-MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion);
-MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationAfterWritingOplogEntries);
MONGO_FAIL_POINT_DEFINE(hangAfterRecordingOpApplicationStartTime);
// The oplog entries applied
Counter64 opsAppliedStats;
ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats);
-// Tracks the oplog application batch size.
-Counter64 oplogApplicationBatchSize;
-ServerStatusMetricField<Counter64> displayOplogApplicationBatchSize("repl.apply.batchSize",
- &oplogApplicationBatchSize);
-
-// Number and time of each ApplyOps worker pool round
-TimerStats applyBatchStats;
-ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats);
-
NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) {
auto optionalUuid = oplogEntry.getUuid();
if (!optionalUuid) {
@@ -165,17 +154,9 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod
} // namespace
SyncTail::SyncTail(OplogApplier::Observer* observer,
- ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
- MultiSyncApplyFunc func,
- ThreadPool* writerPool,
const OplogApplier::Options& options)
- : _observer(observer),
- _consistencyMarkers(consistencyMarkers),
- _storageInterface(storageInterface),
- _applyFunc(func),
- _writerPool(writerPool),
- _options(options) {}
+ : _observer(observer), _storageInterface(storageInterface), _options(options) {}
SyncTail::~SyncTail() {}
@@ -185,74 +166,6 @@ const OplogApplier::Options& SyncTail::getOptions() const {
namespace {
-// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that 'ops'
-// stays valid until all scheduled work in the thread pool completes.
-void scheduleWritesToOplog(OperationContext* opCtx,
- StorageInterface* storageInterface,
- ThreadPool* threadPool,
- const MultiApplier::Operations& ops) {
-
- auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) {
- // The returned function will be run in a separate thread after this returns. Therefore all
- // captures other than 'ops' must be by value since they will not be available. The caller
- // guarantees that 'ops' will stay in scope until the spawned threads complete.
- return [storageInterface, &ops, begin, end](auto status) {
- invariant(status);
-
- auto opCtx = cc().makeOperationContext();
-
- // This code path is only executed on secondaries and initial syncing nodes, so it is
- // safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
-
- UnreplicatedWritesBlock uwb(opCtx.get());
- ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
- opCtx->lockState());
-
- std::vector<InsertStatement> docs;
- docs.reserve(end - begin);
- for (size_t i = begin; i < end; i++) {
- // Add as unowned BSON to avoid unnecessary ref-count bumps.
- // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed.
- docs.emplace_back(InsertStatement{ops[i].getRaw(),
- ops[i].getOpTime().getTimestamp(),
- ops[i].getOpTime().getTerm()});
- }
-
- fassert(40141,
- storageInterface->insertDocuments(
- opCtx.get(), NamespaceString::kRsOplogNamespace, docs));
- };
- };
-
- // We want to be able to take advantage of bulk inserts so we don't use multiple threads if it
- // would result too little work per thread. This also ensures that we can amortize the
- // setup/teardown overhead across many writes.
- const size_t kMinOplogEntriesPerThread = 16;
- const bool enoughToMultiThread =
- ops.size() >= kMinOplogEntriesPerThread * threadPool->getStats().numThreads;
-
- // Only doc-locking engines support parallel writes to the oplog because they are required to
- // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally,
- // there would be no way to take advantage of multiple threads if a storage engine doesn't
- // support document locking.
- if (!enoughToMultiThread ||
- !opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {
-
- threadPool->schedule(makeOplogWriterForRange(0, ops.size()));
- return;
- }
-
-
- const size_t numOplogThreads = threadPool->getStats().numThreads;
- const size_t numOpsPerThread = ops.size() / numOplogThreads;
- for (size_t thread = 0; thread < numOplogThreads; thread++) {
- size_t begin = thread * numOpsPerThread;
- size_t end = (thread == numOplogThreads - 1) ? ops.size() : begin + numOpsPerThread;
- threadPool->schedule(makeOplogWriterForRange(begin, end));
- }
-}
-
/**
* Caches per-collection properties which are relevant for oplog application, so that they don't
* have to be retrieved repeatedly for each op.
@@ -366,16 +279,6 @@ void addDerivedOps(OperationContext* opCtx,
} // namespace
-void SyncTail::shutdown() {
- stdx::lock_guard<Latch> lock(_mutex);
- _inShutdown = true;
-}
-
-bool SyncTail::inShutdown() const {
- stdx::lock_guard<Latch> lock(_mutex);
- return _inShutdown;
-}
-
Status syncApply(OperationContext* opCtx,
const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode) {
@@ -691,158 +594,5 @@ void SyncTail::fillWriterVectors(OperationContext* opCtx,
}
}
-StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
- invariant(!ops.empty());
-
- LOG(2) << "replication batch size is " << ops.size();
-
- // Stop all readers until we're done. This also prevents doc-locking engines from deleting old
- // entries from the oplog until we finish writing.
- Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
-
- auto replCoord = ReplicationCoordinator::get(opCtx);
- if (replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
- severe() << "attempting to replicate ops while primary";
- return {ErrorCodes::CannotApplyOplogWhilePrimary,
- "attempting to replicate ops while primary"};
- }
-
- // Increment the batch size stat.
- oplogApplicationBatchSize.increment(ops.size());
-
- std::vector<WorkerMultikeyPathInfo> multikeyVector(_writerPool->getStats().numThreads);
- {
- // Each node records cumulative batch application stats for itself using this timer.
- TimerHolder timer(&applyBatchStats);
-
- // We must wait for the all work we've dispatched to complete before leaving this block
- // because the spawned threads refer to objects on the stack
- ON_BLOCK_EXIT([&] { _writerPool->waitForIdle(); });
-
- // Write batch of ops into oplog.
- if (!_options.skipWritesToOplog) {
- _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp());
- scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops);
- }
-
- // Holds 'pseudo operations' generated by secondaries to aid in replication.
- // Keep in scope until all operations in 'ops' and 'derivedOps' have been applied.
- // Pseudo operations include:
- // - applyOps operations expanded to individual ops.
- // - ops to update config.transactions. Normal writes to config.transactions in the
- // primary don't create an oplog entry, so extract info from writes with transactions
- // and create a pseudo oplog.
- std::vector<MultiApplier::Operations> derivedOps;
-
- std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
-
- // Wait for writes to finish before applying ops.
- _writerPool->waitForIdle();
-
- // Use this fail point to hold the PBWM lock after we have written the oplog entries but
- // before we have applied them.
- if (MONGO_unlikely(pauseBatchApplicationAfterWritingOplogEntries.shouldFail())) {
- log() << "pauseBatchApplicationAfterWritingOplogEntries fail point enabled. Blocking "
- "until fail point is disabled.";
- pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx);
- }
-
- // Reset consistency markers in case the node fails while applying ops.
- if (!_options.skipWritesToOplog) {
- _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
- _consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime());
- }
-
- {
- std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
-
- // Doles out all the work to the writer pool threads. writerVectors is not modified,
- // but multiSyncApply will modify the vectors that it contains.
- invariant(writerVectors.size() == statusVector.size());
- for (size_t i = 0; i < writerVectors.size(); i++) {
- if (writerVectors[i].empty())
- continue;
-
- _writerPool->schedule(
- [this,
- &writer = writerVectors.at(i),
- &status = statusVector.at(i),
- &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) {
- invariant(scheduleStatus);
-
- auto opCtx = cc().makeOperationContext();
-
- // This code path is only executed on secondaries and initial syncing nodes,
- // so it is safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
-
- status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
- return _applyFunc(opCtx.get(), &writer, this, &multikeyVector);
- });
- });
- }
-
- _writerPool->waitForIdle();
-
- // If any of the statuses is not ok, return error.
- for (auto it = statusVector.cbegin(); it != statusVector.cend(); ++it) {
- const auto& status = *it;
- if (!status.isOK()) {
- severe()
- << "Failed to apply batch of operations. Number of operations in batch: "
- << ops.size() << ". First operation: " << redact(ops.front().toBSON())
- << ". Last operation: " << redact(ops.back().toBSON())
- << ". Oplog application failed in writer thread "
- << std::distance(statusVector.cbegin(), it) << ": " << redact(status);
- return status;
- }
- }
- }
- }
-
- // Notify the storage engine that a replication batch has completed. This means that all the
- // writes associated with the oplog entries in the batch are finished and no new writes with
- // timestamps associated with those oplog entries will show up in the future.
- const auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
- storageEngine->replicationBatchIsComplete();
-
- // Use this fail point to hold the PBWM lock and prevent the batch from completing.
- if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
- log() << "pauseBatchApplicationBeforeCompletion fail point enabled. Blocking until fail "
- "point is disabled.";
- while (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
- if (inShutdown()) {
- severe() << "Turn off pauseBatchApplicationBeforeCompletion before attempting "
- "clean shutdown";
- fassertFailedNoTrace(50798);
- }
- sleepmillis(100);
- }
- }
-
- Timestamp firstTimeInBatch = ops.front().getTimestamp();
- // Set any indexes to multikey that this batch ignored. This must be done while holding the
- // parallel batch writer mode lock.
- for (WorkerMultikeyPathInfo infoVector : multikeyVector) {
- for (MultikeyPathInfo info : infoVector) {
- // We timestamp every multikey write with the first timestamp in the batch. It is always
- // safe to set an index as multikey too early, just not too late. We conservatively pick
- // the first timestamp in the batch since we do not have enough information to find out
- // the timestamp of the first write that set the given multikey path.
- fassert(50686,
- _storageInterface->setIndexIsMultikey(
- opCtx, info.nss, info.indexName, info.multikeyPaths, firstTimeInBatch));
- }
- }
-
- // Increment the counter for the number of ops applied during catchup if the node is in catchup
- // mode.
- replCoord->incrementNumCatchUpOpsIfCatchingUp(ops.size());
-
- // We have now written all database writes and updated the oplog to match.
- return ops.back().getOpTime();
-}
-
} // namespace repl
} // namespace mongo