summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-07 10:08:06 -0400
committerBenety Goh <benety@mongodb.com>2019-04-07 10:08:25 -0400
commit62c32c9c68ad65157f8fdb46d34bec120a17bbf8 (patch)
treebda88e7b370bdfa5847f9d9002bf15b467a3340b /src/mongo/db
parent3e504b77e351c266b8324abbbe4c285e4df8db77 (diff)
downloadmongo-62c32c9c68ad65157f8fdb46d34bec120a17bbf8.tar.gz
SERVER-39950 remove SyncTail::tryPopAndWaitForMore() and _consume()
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp107
-rw-r--r--src/mongo/db/repl/sync_tail.h18
2 files changed, 0 insertions, 125 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index e8a6912ea16..68de8ea4e39 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -847,113 +847,6 @@ inline bool isUnpreparedApplyOps(const OplogEntry& entry) {
return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare();
}
-// Copies ops out of the bgsync queue into the deque passed in as a parameter.
-// Returns true if the batch should be ended early.
-// Batch should end early if we encounter a command, or if
-// there are no further ops in the bgsync queue to read.
-// This function also blocks 1 second waiting for new ops to appear in the bgsync
-// queue. We don't block forever so that we can periodically check for things like shutdown or
-// reconfigs.
-bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
- OplogBuffer* oplogBuffer,
- SyncTail::OpQueue* ops,
- const BatchLimits& limits) {
- {
- BSONObj op;
- // Check to see if there are ops waiting in the bgsync queue
- bool peek_success = oplogBuffer->peek(opCtx, &op);
- if (!peek_success) {
- // If we don't have anything in the queue, wait a bit for something to appear.
- if (ops->empty()) {
- if (inShutdown()) {
- ops->setMustShutdownFlag();
- } else {
- // Block up to 1 second. We still return true in this case because we want this
- // op to be the first in a new batch with a new start time.
- oplogBuffer->waitForData(Seconds(1));
- }
- }
-
- return true;
- }
-
- // If this op would put us over the byte limit don't include it unless the batch is empty.
- // We allow single-op batches to exceed the byte limit so that large ops are able to be
- // processed.
- if (!ops->empty() && (ops->getBytes() + size_t(op.objsize())) > limits.bytes) {
- return true; // Return before wasting time parsing the op.
- }
-
- ops->emplace_back(std::move(op)); // Parses the op in-place.
- }
-
- auto& entry = ops->back();
-
- // check for oplog version change
- int curVersion = entry.getVersion();
- if (curVersion != OplogEntry::kOplogVersion) {
- severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
- << curVersion << " in oplog entry: " << redact(entry.toBSON());
- fassertFailedNoTrace(18820);
- }
-
- auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
- if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) {
-
- ops->pop_back(); // Don't do this op yet.
- if (ops->empty()) {
- // Sleep if we've got nothing to do. Only sleep for 1 second at a time to allow
- // reconfigs and shutdown to occur.
- sleepsecs(1);
- }
- return true;
- }
-
- // Commands must be processed one at a time. The exceptions to this are unprepared applyOps,
- // because applyOps oplog entries are effectively containers for CRUD operations, and unprepared
- // commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to
- // batch applyOps commands with CRUD operations when reading from the oplog buffer.
- //
- // Oplog entries on 'system.views' should also be processed one at a time. View catalog
- // immediately reflects changes for each oplog entry so we can see inconsistent view catalog if
- // multiple oplog entries on 'system.views' are being applied out of the original order.
- //
- // Process updates to 'admin.system.version' individually as well so the secondary's FCV when
- // processing each operation matches the primary's when committing that operation.
- if ((entry.isCommand() && (!isUnpreparedCommit(entry) && !isUnpreparedApplyOps(entry))) ||
- entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) {
- if (ops->getCount() == 1) {
- // apply commands one-at-a-time
- _consume(opCtx, oplogBuffer);
- } else {
- // This op must be processed alone, but we already had ops in the queue so we can't
- // include it in this batch. Since we didn't call consume(), we'll see this again next
- // time and process it alone.
- ops->pop_back();
- }
-
- // Apply what we have so far.
- return true;
- }
-
- // We are going to apply this Op.
- _consume(opCtx, oplogBuffer);
-
- // Go back for more ops, unless we've hit the limit.
- return ops->getCount() >= limits.ops;
-}
-
-void SyncTail::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
- // This is just to get the op off the queue; it's been peeked at and queued for application
- // already.
- // If we failed to get an op off the queue, this means that shutdown() was called between the
- // consumer's calls to peek() and consume(). shutdown() cleared the buffer so there is nothing
- // for us to consume here. Since our postcondition is already met, it is safe to return
- // successfully.
- BSONObj op;
- invariant(oplogBuffer->tryPop(opCtx, &op) || inShutdown());
-}
-
void SyncTail::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inShutdown = true;
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 971702a2c6f..84a6086f606 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -198,18 +198,6 @@ public:
using BatchLimits = OplogApplier::BatchLimits;
/**
- * Attempts to pop an OplogEntry off the BGSync queue and add it to ops.
- *
- * Returns true if the (possibly empty) batch in ops should be ended and a new one started.
- * If ops is empty on entry and nothing can be added yet, will wait up to a second before
- * returning true.
- */
- bool tryPopAndWaitForMore(OperationContext* opCtx,
- OplogBuffer* oplogBuffer,
- OpQueue* ops,
- const BatchLimits& limits);
-
- /**
* Fetch a single document referenced in the operation from the sync source.
*
* The sync source is specified at construction in
@@ -243,12 +231,6 @@ public:
StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
private:
- /**
- * Pops the operation at the front of the OplogBuffer.
- * Updates stats on BackgroundSync.
- */
- void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer);
-
class OpQueueBatcher;
void _oplogApplication(ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept;