From 62c32c9c68ad65157f8fdb46d34bec120a17bbf8 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Sun, 7 Apr 2019 10:08:06 -0400 Subject: SERVER-39950 remove SyncTail::tryPopAndWaitForMore() and _consume() --- src/mongo/db/repl/sync_tail.cpp | 107 ---------------------------------------- src/mongo/db/repl/sync_tail.h | 18 ------- 2 files changed, 125 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index e8a6912ea16..68de8ea4e39 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -847,113 +847,6 @@ inline bool isUnpreparedApplyOps(const OplogEntry& entry) { return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare(); } -// Copies ops out of the bgsync queue into the deque passed in as a parameter. -// Returns true if the batch should be ended early. -// Batch should end early if we encounter a command, or if -// there are no further ops in the bgsync queue to read. -// This function also blocks 1 second waiting for new ops to appear in the bgsync -// queue. We don't block forever so that we can periodically check for things like shutdown or -// reconfigs. -bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, - OplogBuffer* oplogBuffer, - SyncTail::OpQueue* ops, - const BatchLimits& limits) { - { - BSONObj op; - // Check to see if there are ops waiting in the bgsync queue - bool peek_success = oplogBuffer->peek(opCtx, &op); - if (!peek_success) { - // If we don't have anything in the queue, wait a bit for something to appear. - if (ops->empty()) { - if (inShutdown()) { - ops->setMustShutdownFlag(); - } else { - // Block up to 1 second. We still return true in this case because we want this - // op to be the first in a new batch with a new start time. - oplogBuffer->waitForData(Seconds(1)); - } - } - - return true; - } - - // If this op would put us over the byte limit don't include it unless the batch is empty. - // We allow single-op batches to exceed the byte limit so that large ops are able to be - // processed. - if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) { - return true; // Return before wasting time parsing the op. - } - - ops->emplace_back(std::move(op)); // Parses the op in-place. - } - - auto& entry = ops->back(); - - // check for oplog version change - int curVersion = entry.getVersion(); - if (curVersion != OplogEntry::kOplogVersion) { - severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " - << curVersion << " in oplog entry: " << redact(entry.toBSON()); - fassertFailedNoTrace(18820); - } - - auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs())); - if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) { - - ops->pop_back(); // Don't do this op yet. - if (ops->empty()) { - // Sleep if we've got nothing to do. Only sleep for 1 second at a time to allow - // reconfigs and shutdown to occur. - sleepsecs(1); - } - return true; - } - - // Commands must be processed one at a time. The exceptions to this are unprepared applyOps, - // because applyOps oplog entries are effectively containers for CRUD operations, and unprepared - // commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to - // batch applyOps commands with CRUD operations when reading from the oplog buffer. - // - // Oplog entries on 'system.views' should also be processed one at a time. View catalog - // immediately reflects changes for each oplog entry so we can see inconsistent view catalog if - // multiple oplog entries on 'system.views' are being applied out of the original order. - // - // Process updates to 'admin.system.version' individually as well so the secondary's FCV when - // processing each operation matches the primary's when committing that operation. - if ((entry.isCommand() && (!isUnpreparedCommit(entry) && !isUnpreparedApplyOps(entry))) || - entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) { - if (ops->getCount() == 1) { - // apply commands one-at-a-time - _consume(opCtx, oplogBuffer); - } else { - // This op must be processed alone, but we already had ops in the queue so we can't - // include it in this batch. Since we didn't call consume(), we'll see this again next - // time and process it alone. - ops->pop_back(); - } - - // Apply what we have so far. - return true; - } - - // We are going to apply this Op. - _consume(opCtx, oplogBuffer); - - // Go back for more ops, unless we've hit the limit. - return ops->getCount() >= limits.ops; -} - -void SyncTail::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { - // This is just to get the op off the queue; it's been peeked at and queued for application - // already. - // If we failed to get an op off the queue, this means that shutdown() was called between the - // consumer's calls to peek() and consume(). shutdown() cleared the buffer so there is nothing - // for us to consume here. Since our postcondition is already met, it is safe to return - // successfully. - BSONObj op; - invariant(oplogBuffer->tryPop(opCtx, &op) || inShutdown()); -} - void SyncTail::shutdown() { stdx::lock_guard lock(_mutex); _inShutdown = true; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 971702a2c6f..84a6086f606 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -197,18 +197,6 @@ public: using BatchLimits = OplogApplier::BatchLimits; - /** - * Attempts to pop an OplogEntry off the BGSync queue and add it to ops. - * - * Returns true if the (possibly empty) batch in ops should be ended and a new one started. - * If ops is empty on entry and nothing can be added yet, will wait up to a second before - * returning true. - */ - bool tryPopAndWaitForMore(OperationContext* opCtx, - OplogBuffer* oplogBuffer, - OpQueue* ops, - const BatchLimits& limits); - /** * Fetch a single document referenced in the operation from the sync source. * @@ -243,12 +231,6 @@ public: StatusWith multiApply(OperationContext* opCtx, MultiApplier::Operations ops); private: - /** - * Pops the operation at the front of the OplogBuffer. - * Updates stats on BackgroundSync. - */ - void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer); - class OpQueueBatcher; void _oplogApplication(ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept; -- cgit v1.2.1