summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-10-30 16:31:48 -0400
committerJudah Schvimer <judah@mongodb.com>2017-10-30 16:40:41 -0400
commitb49b20887cf13720c0e863d24da95cc0239889f8 (patch)
tree2faf58212c5a842cc5b0f9cab4a5f93ee44fe544
parent350ee88b33f32b179b636f33b7db5b0c03932d24 (diff)
downloadmongo-b49b20887cf13720c0e863d24da95cc0239889f8.tar.gz
SERVER-31387 oplog application conflates upserting with being in steady state replication
-rw-r--r--src/mongo/db/repl/apply_ops.cpp12
-rw-r--r--src/mongo/db/repl/master_slave.cpp5
-rw-r--r--src/mongo/db/repl/multiapplier.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp54
-rw-r--r--src/mongo/db/repl/oplog.h36
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp41
-rw-r--r--src/mongo/db/repl/sync_tail.h30
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp201
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp29
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h4
11 files changed, 291 insertions, 128 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 0bd95ba91ec..3d17a8c5291 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -171,7 +171,8 @@ Status _applyOps(OperationContext* opCtx,
OldClientContext ctx(opCtx, nss.ns());
- status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert);
+ status = repl::applyOperation_inlock(
+ opCtx, ctx.db(), opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOps);
if (!status.isOK())
return status;
} else {
@@ -180,7 +181,8 @@ Status _applyOps(OperationContext* opCtx,
opCtx, "applyOps", nss.ns(), [opCtx, nss, opObj, opType, alwaysUpsert] {
if (*opType == 'c') {
invariant(opCtx->lockState()->isW());
- return repl::applyCommand_inlock(opCtx, opObj, true);
+ return repl::applyCommand_inlock(
+ opCtx, opObj, repl::OplogApplication::Mode::kApplyOps);
}
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
@@ -202,7 +204,11 @@ Status _applyOps(OperationContext* opCtx,
if (!nss.isSystemDotIndexes()) {
return repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert);
+ opCtx,
+ ctx.db(),
+ opObj,
+ alwaysUpsert,
+ repl::OplogApplication::Mode::kApplyOps);
}
auto fieldO = opObj["o"];
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index b5fbdeea19f..369beaac998 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -646,7 +646,7 @@ bool ReplSource::handleDuplicateDbName(OperationContext* opCtx,
void ReplSource::applyCommand(OperationContext* opCtx, const BSONObj& op) {
try {
- Status status = applyCommand_inlock(opCtx, op, true);
+ Status status = applyCommand_inlock(opCtx, op, OplogApplication::Mode::kMasterSlave);
uassert(28639, "Failure applying initial sync command", status.isOK());
} catch (AssertionException& e) {
log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op)
@@ -661,7 +661,8 @@ void ReplSource::applyCommand(OperationContext* opCtx, const BSONObj& op) {
void ReplSource::applyOperation(OperationContext* opCtx, Database* db, const BSONObj& op) {
try {
- Status status = applyOperation_inlock(opCtx, db, op);
+ Status status =
+ applyOperation_inlock(opCtx, db, op, false, OplogApplication::Mode::kMasterSlave);
if (!status.isOK()) {
uassert(15914,
"Failure applying initial sync operation",
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
index 97567fde6cc..a81ed55b47f 100644
--- a/src/mongo/db/repl/multiapplier.h
+++ b/src/mongo/db/repl/multiapplier.h
@@ -68,8 +68,8 @@ public:
/**
* Type of function to to apply a single operation. In production, this function
- * would have the same outcome as calling SyncTail::syncApply() ('inSteadyStateReplication'
- * value will be embedded in the function implementation).
+ * would have the same outcome as calling SyncTail::syncApply() (oplog application mode
+ * will be embedded in the function implementation).
*/
using ApplyOperationFn = stdx::function<Status(OperationPtrs*)>;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 567a9be6335..e91fa140176 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -898,6 +898,46 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
} // namespace
+constexpr StringData OplogApplication::kInitialSyncOplogApplicationMode;
+constexpr StringData OplogApplication::kMasterSlaveOplogApplicationMode;
+constexpr StringData OplogApplication::kRecoveringOplogApplicationMode;
+constexpr StringData OplogApplication::kSecondaryOplogApplicationMode;
+constexpr StringData OplogApplication::kApplyOpsOplogApplicationMode;
+
+StringData OplogApplication::modeToString(OplogApplication::Mode mode) {
+ switch (mode) {
+ case OplogApplication::Mode::kInitialSync:
+ return OplogApplication::kInitialSyncOplogApplicationMode;
+ case OplogApplication::Mode::kMasterSlave:
+ return OplogApplication::kMasterSlaveOplogApplicationMode;
+ case OplogApplication::Mode::kRecovering:
+ return OplogApplication::kRecoveringOplogApplicationMode;
+ case OplogApplication::Mode::kSecondary:
+ return OplogApplication::kSecondaryOplogApplicationMode;
+ case OplogApplication::Mode::kApplyOps:
+ return OplogApplication::kApplyOpsOplogApplicationMode;
+ }
+ MONGO_UNREACHABLE;
+}
+
+StatusWith<OplogApplication::Mode> OplogApplication::parseMode(const std::string& mode) {
+ if (mode == OplogApplication::kInitialSyncOplogApplicationMode) {
+ return OplogApplication::Mode::kInitialSync;
+ } else if (mode == OplogApplication::kMasterSlaveOplogApplicationMode) {
+ return OplogApplication::Mode::kMasterSlave;
+ } else if (mode == OplogApplication::kRecoveringOplogApplicationMode) {
+ return OplogApplication::Mode::kRecovering;
+ } else if (mode == OplogApplication::kSecondaryOplogApplicationMode) {
+ return OplogApplication::Mode::kSecondary;
+ } else if (mode == OplogApplication::kApplyOpsOplogApplicationMode) {
+ return OplogApplication::Mode::kApplyOps;
+ } else {
+ return Status(ErrorCodes::FailedToParse,
+ str::stream() << "Invalid oplog application mode provided: " << mode);
+ }
+ MONGO_UNREACHABLE;
+}
+
std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO,
const BSONObj& op,
const NamespaceString& requestNss) {
@@ -942,7 +982,8 @@ std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement
Status applyOperation_inlock(OperationContext* opCtx,
Database* db,
const BSONObj& op,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode mode,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
LOG(3) << "applying op: " << redact(op);
@@ -1003,7 +1044,8 @@ Status applyOperation_inlock(OperationContext* opCtx,
// During upgrade from 3.4 to 3.6, the feature compatibility version cannot change during
// initial sync because we cannot do some operations with UUIDs and others without.
- if (!inSteadyStateReplication && requestNss == FeatureCompatibilityVersion::kCollection) {
+ if ((mode == OplogApplication::Mode::kInitialSync) &&
+ requestNss == FeatureCompatibilityVersion::kCollection) {
std::string oID;
auto status = bsonExtractStringField(o, "_id", &oID);
if (status.isOK() && oID == FeatureCompatibilityVersion::kParameterName) {
@@ -1238,7 +1280,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
opCounters->gotUpdate();
BSONObj updateCriteria = o2;
- const bool upsert = valueB || inSteadyStateReplication;
+ const bool upsert = valueB || alwaysUpsert;
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply update due to missing _id: " << op.toString(),
@@ -1361,7 +1403,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
Status applyCommand_inlock(OperationContext* opCtx,
const BSONObj& op,
- bool inSteadyStateReplication) {
+ OplogApplication::Mode mode) {
std::array<StringData, 4> names = {"o", "ui", "ns", "op"};
std::array<BSONElement, 4> fields;
op.getFields(names, &fields);
@@ -1400,7 +1442,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
// Applying renameCollection during initial sync to a collection without UUID might lead to
// data corruption, so we restart the initial sync.
- if (fieldUI.eoo() && !inSteadyStateReplication &&
+ if (fieldUI.eoo() && (mode == OplogApplication::Mode::kInitialSync) &&
o.firstElementFieldName() == std::string("renameCollection")) {
if (!allowUnsafeRenamesDuringInitialSync.load()) {
return Status(ErrorCodes::OplogOperationUnsupported,
@@ -1420,7 +1462,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
// collection dropped. 'applyOps' will try to apply each individual operation, and those
// will be caught then if they are a problem.
auto whitelistedOps = std::vector<std::string>{"dropDatabase", "applyOps", "dbCheck"};
- if (!inSteadyStateReplication &&
+ if ((mode == OplogApplication::Mode::kInitialSync) &&
(std::find(whitelistedOps.begin(), whitelistedOps.end(), o.firstElementFieldName()) ==
whitelistedOps.end()) &&
parseNs(nss.ns(), o) == FeatureCompatibilityVersion::kCollection) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index df2178a66aa..7ece1b4eedc 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -166,29 +166,51 @@ using IncrementOpsAppliedStatsFn = stdx::function<void()>;
std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO,
const BSONObj& op,
const NamespaceString& requestNss);
+
+/**
+ * This class encapsulates an enum of the oplog application mode and functions that serialize
+ * and parse it.
+ */
+class OplogApplication {
+public:
+ static constexpr StringData kInitialSyncOplogApplicationMode = "InitialSync"_sd;
+ static constexpr StringData kMasterSlaveOplogApplicationMode = "MasterSlave"_sd;
+ static constexpr StringData kRecoveringOplogApplicationMode = "Recovering"_sd;
+ static constexpr StringData kSecondaryOplogApplicationMode = "Secondary"_sd;
+ static constexpr StringData kApplyOpsOplogApplicationMode = "ApplyOps"_sd;
+
+ enum class Mode { kInitialSync, kMasterSlave, kRecovering, kSecondary, kApplyOps };
+
+ static StringData modeToString(Mode mode);
+
+ static StatusWith<Mode> parseMode(const std::string& mode);
+};
+
+inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) {
+ return (s << OplogApplication::modeToString(mode));
+}
+
/**
* Take a non-command op and apply it locally
* Used for applying from an oplog
- * @param inSteadyStateReplication convert some updates to upserts for idempotency reasons
+ * @param alwaysUpsert convert some updates to upserts for idempotency reasons
+ * @param mode specifies what oplog application mode we are in
* @param incrementOpsAppliedStats is called whenever an op is applied.
* Returns failure status if the op was an update that could not be applied.
*/
Status applyOperation_inlock(OperationContext* opCtx,
Database* db,
const BSONObj& op,
- bool inSteadyStateReplication = false,
+ bool alwaysUpsert,
+ OplogApplication::Mode mode,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {});
/**
* Take a command op and apply it locally
* Used for applying from an oplog
- * inSteadyStateReplication indicates whether we are in steady state replication, rather than
- * initial sync.
* Returns failure status if the op that could not be applied.
*/
-Status applyCommand_inlock(OperationContext* opCtx,
- const BSONObj& op,
- bool inSteadyStateReplication);
+Status applyCommand_inlock(OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode mode);
/**
* Initializes the global Timestamp with the value from the timestamp of the last oplog entry.
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index bab0894f929..6b416de6e3e 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -212,7 +212,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
while (cursor->more()) {
auto entry = cursor->nextSafe();
- fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true));
+ fassertStatusOK(40294,
+ SyncTail::syncApply(opCtx, entry, OplogApplication::Mode::kRecovering));
_consistencyMarkers->setAppliedThrough(
opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 4cde3fec5c5..35362355aad 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -59,7 +59,6 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/multiapplier.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
@@ -305,7 +304,7 @@ bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) {
// static
Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
- bool inSteadyStateReplication,
+ OplogApplication::Mode oplogApplicationMode,
ApplyOperationInLockFn applyOperationInLock,
ApplyCommandInLockFn applyCommandInLock,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
@@ -322,8 +321,17 @@ Status SyncTail::syncApply(OperationContext* opCtx,
UnreplicatedWritesBlock uwb(opCtx);
DisableDocumentValidation validationDisabler(opCtx);
- Status status =
- applyOperationInLock(opCtx, db, op, inSteadyStateReplication, incrementOpsAppliedStats);
+ // We convert updates to upserts when not in initial sync because after rollback and during
+ // startup we may replay an update after a delete and crash since we do not ignore
+ // errors. In initial sync we simply ignore these update errors so there is no reason to
+ // upsert.
+ //
+ // TODO (SERVER-21700): Never upsert during oplog application unless an external applyOps
+ // wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP
+ // mode (similar to initial sync) instead so we do not accidentally ignore real errors.
+ bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync);
+ Status status = applyOperationInLock(
+ opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
}
@@ -384,7 +392,7 @@ Status SyncTail::syncApply(OperationContext* opCtx,
Lock::GlobalWrite globalWriteLock(opCtx);
// special case apply for commands to avoid implicit database creation
- Status status = applyCommandInLock(opCtx, op, inSteadyStateReplication);
+ Status status = applyCommandInLock(opCtx, op, oplogApplicationMode);
incrementOpsAppliedStats();
return status;
});
@@ -399,10 +407,10 @@ Status SyncTail::syncApply(OperationContext* opCtx,
Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
- bool inSteadyStateReplication) {
+ OplogApplication::Mode oplogApplicationMode) {
return SyncTail::syncApply(opCtx,
op,
- inSteadyStateReplication,
+ oplogApplicationMode,
applyOperation_inlock,
applyCommand_inlock,
stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL));
@@ -1165,8 +1173,9 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON
void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail*) {
initializeWriterThread();
auto opCtx = cc().makeOperationContext();
- auto syncApply = [](OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) {
- return SyncTail::syncApply(opCtx, op, inSteadyStateReplication);
+ auto syncApply = [](
+ OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) {
+ return SyncTail::syncApply(opCtx, op, oplogApplicationMode);
};
fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply));
@@ -1192,7 +1201,9 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
}
// This function is only called in steady state replication.
- const bool inSteadyStateReplication = true;
+ // TODO: This function can be called when we're in recovering as well as secondary. Set this
+ // mode correctly.
+ const OplogApplication::Mode oplogApplicationMode = OplogApplication::Mode::kSecondary;
// doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op
// of a failed group and not allowing further group inserts until that op has been processed.
@@ -1310,7 +1321,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
try {
// Apply the group of inserts.
uassertStatusOK(
- syncApply(opCtx, groupedInsertBuilder.done(), inSteadyStateReplication));
+ syncApply(opCtx, groupedInsertBuilder.done(), oplogApplicationMode));
// It succeeded, advance the oplogEntriesIterator to the end of the
// group of inserts.
oplogEntriesIterator = endOfGroupableOpsIterator - 1;
@@ -1330,7 +1341,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
// If we didn't create a group, try to apply the op individually.
try {
- const Status status = syncApply(opCtx, entry->raw, inSteadyStateReplication);
+ const Status status = syncApply(opCtx, entry->raw, oplogApplicationMode);
if (!status.isOK()) {
severe() << "Error applying operation (" << redact(entry->raw)
@@ -1373,13 +1384,11 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
// allow us to get through the magic barrier
opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
- // This function is only called in initial sync, as its name suggests.
- const bool inSteadyStateReplication = false;
-
for (auto it = ops->begin(); it != ops->end(); ++it) {
auto& entry = **it;
try {
- const Status s = SyncTail::syncApply(opCtx, entry.raw, inSteadyStateReplication);
+ const Status s =
+ SyncTail::syncApply(opCtx, entry.raw, OplogApplication::Mode::kInitialSync);
if (!s.isOK()) {
// In initial sync, update operations can cause documents to be missed during
// collection cloning. As a result, it is possible that a document that we need to
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index e90ab2514bf..b3ae1fe4a5b 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -34,6 +34,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/repl/multiapplier.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/concurrency/old_thread_pool.h"
@@ -65,26 +66,27 @@ public:
* Used for applying from an oplog.
* 'db' is the database where the op will be applied.
* 'opObj' is a BSONObj describing the op to be applied.
- * 'inSteadyStateReplication' indicates to convert some updates to upserts for idempotency
- * reasons.
+ * 'alwaysUpsert' indicates to convert updates to upserts for idempotency reasons.
+ * 'mode' indicates the oplog application mode.
* 'opCounter' is used to update server status metrics.
* Returns failure status if the op was an update that could not be applied.
*/
- using ApplyOperationInLockFn = stdx::function<Status(OperationContext* opCtx,
- Database* db,
- const BSONObj& opObj,
- bool inSteadyStateReplication,
- IncrementOpsAppliedStatsFn opCounter)>;
+ using ApplyOperationInLockFn =
+ stdx::function<Status(OperationContext* opCtx,
+ Database* db,
+ const BSONObj& opObj,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
+ IncrementOpsAppliedStatsFn opCounter)>;
/**
* Type of function that takes a command op and applies it locally.
* Used for applying from an oplog.
- * inSteadyStateReplication indicates whether we are in steady state replication, rather than
- * initial sync.
+ * 'mode' indicates the oplog application mode.
* Returns failure status if the op that could not be applied.
*/
- using ApplyCommandInLockFn =
- stdx::function<Status(OperationContext*, const BSONObj&, bool inSteadyStateReplication)>;
+ using ApplyCommandInLockFn = stdx::function<Status(
+ OperationContext*, const BSONObj&, OplogApplication::Mode oplogApplicationMode)>;
SyncTail(BackgroundSync* q, MultiSyncApplyFunc func);
SyncTail(BackgroundSync* q, MultiSyncApplyFunc func, std::unique_ptr<OldThreadPool> writerPool);
@@ -102,14 +104,14 @@ public:
*/
static Status syncApply(OperationContext* opCtx,
const BSONObj& o,
- bool inSteadyStateReplication,
+ OplogApplication::Mode oplogApplicationMode,
ApplyOperationInLockFn applyOperationInLock,
ApplyCommandInLockFn applyCommandInLock,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats);
static Status syncApply(OperationContext* opCtx,
const BSONObj& o,
- bool inSteadyStateReplication);
+ OplogApplication::Mode oplogApplicationMode);
void oplogApplication(ReplicationCoordinator* replCoord);
bool peek(OperationContext* opCtx, BSONObj* obj);
@@ -277,7 +279,7 @@ Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops,
* SyncTail::syncApply.
*/
using SyncApplyFn = stdx::function<Status(
- OperationContext* opCtx, const BSONObj& o, bool inSteadyStateReplication)>;
+ OperationContext* opCtx, const BSONObj& o, OplogApplication::Mode oplogApplicationMode)>;
Status multiSyncApply_noAbort(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncApplyFn syncApply);
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index e964e291b50..001469032ff 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -162,8 +162,10 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) {
TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) {
const BSONObj op = BSON("op"
<< "x");
- ASSERT_EQUALS(ErrorCodes::BadValue,
- SyncTail::syncApply(_opCtx.get(), op, false, _applyOp, _applyCmd, _incOps));
+ ASSERT_EQUALS(
+ ErrorCodes::BadValue,
+ SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kInitialSync, _applyOp, _applyCmd, _incOps));
ASSERT_EQUALS(0U, _opsApplied);
}
@@ -171,7 +173,7 @@ TEST_F(SyncTailTest, SyncApplyNoNamespaceNoOp) {
ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
BSON("op"
<< "n"),
- false));
+ OplogApplication::Mode::kInitialSync));
ASSERT_EQUALS(0U, _opsApplied);
}
@@ -182,11 +184,13 @@ TEST_F(SyncTailTest, SyncApplyBadOp) {
<< "test.t");
ASSERT_EQUALS(
ErrorCodes::BadValue,
- SyncTail::syncApply(_opCtx.get(), op, false, _applyOp, _applyCmd, _incOps).code());
+ SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kInitialSync, _applyOp, _applyCmd, _incOps)
+ .code());
ASSERT_EQUALS(0U, _opsApplied);
}
-TEST_F(SyncTailTest, SyncApplyNoOp) {
+TEST_F(SyncTailTest, SyncApplyNoOpInitialSync) {
const BSONObj op = BSON("op"
<< "n"
<< "ns"
@@ -195,7 +199,8 @@ TEST_F(SyncTailTest, SyncApplyNoOp) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx,
Database* db,
const BSONObj& theOperation,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
stdx::function<void()>) {
applyOpCalled = true;
ASSERT_TRUE(opCtx);
@@ -204,12 +209,52 @@ TEST_F(SyncTailTest, SyncApplyNoOp) {
ASSERT_TRUE(documentValidationDisabled(opCtx));
ASSERT_TRUE(db);
ASSERT_BSONOBJ_EQ(op, theOperation);
- ASSERT_FALSE(inSteadyStateReplication);
+ ASSERT_FALSE(alwaysUpsert);
+ ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kInitialSync);
return Status::OK();
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, failedApplyCommand, _incOps));
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ op,
+ OplogApplication::Mode::kInitialSync,
+ applyOp,
+ failedApplyCommand,
+ _incOps));
+ ASSERT_TRUE(applyOpCalled);
+}
+
+TEST_F(SyncTailTest, SyncApplyNoOpNotInitialSync) {
+ const BSONObj op = BSON("op"
+ << "n"
+ << "ns"
+ << "test.t");
+ bool applyOpCalled = false;
+ SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx,
+ Database* db,
+ const BSONObj& theOperation,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
+ stdx::function<void()>) {
+ applyOpCalled = true;
+ ASSERT_TRUE(opCtx);
+ ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_X));
+ ASSERT_FALSE(opCtx->writesAreReplicated());
+ ASSERT_TRUE(documentValidationDisabled(opCtx));
+ ASSERT_TRUE(db);
+ ASSERT_BSONOBJ_EQ(op, theOperation);
+ ASSERT(alwaysUpsert);
+ ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kSecondary);
+ return Status::OK();
+ };
+ ASSERT_TRUE(_opCtx->writesAreReplicated());
+ ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ op,
+ OplogApplication::Mode::kSecondary,
+ applyOp,
+ failedApplyCommand,
+ _incOps));
ASSERT_TRUE(applyOpCalled);
}
@@ -278,7 +323,8 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx,
Database* db,
const BSONObj& theOperation,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
stdx::function<void()>) {
applyOpCalled = true;
ASSERT_TRUE(opCtx);
@@ -287,12 +333,18 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) {
ASSERT_TRUE(documentValidationDisabled(opCtx));
ASSERT_TRUE(db);
ASSERT_BSONOBJ_EQ(op, theOperation);
- ASSERT_FALSE(inSteadyStateReplication);
+ ASSERT_FALSE(alwaysUpsert);
+ ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kInitialSync);
return Status::OK();
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, failedApplyCommand, _incOps));
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ op,
+ OplogApplication::Mode::kInitialSync,
+ applyOp,
+ failedApplyCommand,
+ _incOps));
ASSERT_TRUE(applyOpCalled);
}
@@ -305,24 +357,27 @@ TEST_F(SyncTailTest, SyncApplyCommand) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx,
Database* db,
const BSONObj& theOperation,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
stdx::function<void()>) {
FAIL("applyOperation unexpectedly invoked.");
return Status::OK();
};
- SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* opCtx, const BSONObj& theOperation, bool inSteadyStateReplication) {
- applyCmdCalled = true;
- ASSERT_TRUE(opCtx);
- ASSERT_TRUE(opCtx->lockState()->isW());
- ASSERT_TRUE(opCtx->writesAreReplicated());
- ASSERT_FALSE(documentValidationDisabled(opCtx));
- ASSERT_BSONOBJ_EQ(op, theOperation);
- return Status::OK();
- };
+ SyncTail::ApplyCommandInLockFn applyCmd = [&](OperationContext* opCtx,
+ const BSONObj& theOperation,
+ OplogApplication::Mode oplogApplicationMode) {
+ applyCmdCalled = true;
+ ASSERT_TRUE(opCtx);
+ ASSERT_TRUE(opCtx->lockState()->isW());
+ ASSERT_TRUE(opCtx->writesAreReplicated());
+ ASSERT_FALSE(documentValidationDisabled(opCtx));
+ ASSERT_BSONOBJ_EQ(op, theOperation);
+ return Status::OK();
+ };
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, applyCmd, _incOps));
+ ASSERT_OK(SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, applyCmd, _incOps));
ASSERT_TRUE(applyCmdCalled);
ASSERT_EQUALS(1U, _opsApplied);
}
@@ -336,20 +391,23 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx,
Database* db,
const BSONObj& theOperation,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
stdx::function<void()>) {
FAIL("applyOperation unexpectedly invoked.");
return Status::OK();
};
- SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* opCtx, const BSONObj& theOperation, bool inSteadyStateReplication) {
- applyCmdCalled++;
- if (applyCmdCalled < 5) {
- throw WriteConflictException();
- }
- return Status::OK();
- };
- ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, applyCmd, _incOps));
+ SyncTail::ApplyCommandInLockFn applyCmd = [&](OperationContext* opCtx,
+ const BSONObj& theOperation,
+ OplogApplication::Mode oplogApplicationMode) {
+ applyCmdCalled++;
+ if (applyCmdCalled < 5) {
+ throw WriteConflictException();
+ }
+ return Status::OK();
+ };
+ ASSERT_OK(SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, applyCmd, _incOps));
ASSERT_EQUALS(5, applyCmdCalled);
ASSERT_EQUALS(1U, _opsApplied);
}
@@ -623,13 +681,14 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake
TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- auto syncApply = [](OperationContext* opCtx, const BSONObj&, bool convertUpdatesToUpserts) {
- ASSERT_FALSE(opCtx->writesAreReplicated());
- ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
- ASSERT_TRUE(documentValidationDisabled(opCtx));
- ASSERT_TRUE(convertUpdatesToUpserts);
- return Status::OK();
- };
+ auto syncApply =
+ [](OperationContext* opCtx, const BSONObj&, OplogApplication::Mode oplogApplicationMode) {
+ ASSERT_FALSE(opCtx->writesAreReplicated());
+ ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
+ ASSERT_TRUE(documentValidationDisabled(opCtx));
+ ASSERT_EQUALS(OplogApplication::Mode::kSecondary, oplogApplicationMode);
+ return Status::OK();
+ };
auto op = makeUpdateDocumentOplogEntry(
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
@@ -639,7 +698,7 @@ TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperat
TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
OplogEntry op(OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, nss, BSONObj());
- auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status {
+ auto syncApply = [](OperationContext*, const BSONObj&, OplogApplication::Mode) -> Status {
return {ErrorCodes::OperationFailed, ""};
};
MultiApplier::OperationPtrs ops = {&op};
@@ -650,7 +709,7 @@ TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToAppl
TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
OplogEntry op(OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, nss, BSONObj());
- auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status {
+ auto syncApply = [](OperationContext*, const BSONObj&, OplogApplication::Mode) -> Status {
uasserted(ErrorCodes::OperationFailed, "");
MONGO_UNREACHABLE;
};
@@ -670,10 +729,11 @@ TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplyin
auto op3 = makeOp("test.t2");
auto op4 = makeOp("test.t3");
MultiApplier::Operations operationsApplied;
- auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(OplogEntry(op));
- return Status::OK();
- };
+ auto syncApply =
+ [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) {
+ operationsApplied.push_back(OplogEntry(op));
+ return Status::OK();
+ };
MultiApplier::OperationPtrs ops = {&op4, &op1, &op3, &op2};
ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
ASSERT_EQUALS(4U, operationsApplied.size());
@@ -698,10 +758,11 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin
auto insertOp2a = makeOp(nss2);
auto insertOp2b = makeOp(nss2);
std::vector<BSONObj> operationsApplied;
- auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(op.copy());
- return Status::OK();
- };
+ auto syncApply =
+ [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
MultiApplier::OperationPtrs ops = {
&createOp1, &createOp2, &insertOp1a, &insertOp2a, &insertOp1b, &insertOp2b};
@@ -750,10 +811,11 @@ TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation)
operationsToApply.push_back(createOp);
std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply));
std::vector<BSONObj> operationsApplied;
- auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(op.copy());
- return Status::OK();
- };
+ auto syncApply =
+ [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
MultiApplier::OperationPtrs ops;
for (auto&& op : operationsToApply) {
@@ -817,10 +879,11 @@ TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchSizeWhenGroupingInsertOperations)
}
std::vector<BSONObj> operationsApplied;
- auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(op.copy());
- return Status::OK();
- };
+ auto syncApply =
+ [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
// Apply the ops.
ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
@@ -859,10 +922,11 @@ TEST_F(SyncTailTest, MultiSyncApplyAppliesOpIndividuallyWhenOpIndividuallyExceed
}
std::vector<BSONObj> operationsApplied;
- auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(op.copy());
- return Status::OK();
- };
+ auto syncApply =
+ [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
// Apply the ops.
ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
@@ -892,10 +956,11 @@ TEST_F(SyncTailTest, MultiSyncApplyAppliesInsertOpsIndividuallyWhenUnableToCreat
makeOp(NamespaceString(testNs + "_3"))};
std::vector<BSONObj> operationsApplied;
- auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(op.copy());
- return Status::OK();
- };
+ auto syncApply =
+ [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) {
+ operationsApplied.push_back(op.copy());
+ return Status::OK();
+ };
MultiApplier::OperationPtrs ops;
for (auto&& op : operationsToApply) {
@@ -935,8 +1000,8 @@ TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGro
std::size_t numFailedGroupedInserts = 0;
MultiApplier::Operations operationsApplied;
- auto syncApply = [&numFailedGroupedInserts,
- &operationsApplied](OperationContext*, const BSONObj& op, bool) -> Status {
+ auto syncApply = [&numFailedGroupedInserts, &operationsApplied](
+ OperationContext*, const BSONObj& op, OplogApplication::Mode) -> Status {
// Reject grouped insert operations.
if (op["o"].type() == BSONType::Array) {
numFailedGroupedInserts++;
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 070f76dfc8c..f101cfe59d8 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -86,9 +86,12 @@ void SyncTailTest::setUp() {
_applyOp = [](OperationContext* opCtx,
Database* db,
const BSONObj& op,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
stdx::function<void()>) { return Status::OK(); };
- _applyCmd = [](OperationContext* opCtx, const BSONObj& op, bool) { return Status::OK(); };
+ _applyCmd = [](OperationContext* opCtx,
+ const BSONObj& op,
+ OplogApplication::Mode oplogApplicationMode) { return Status::OK(); };
_incOps = [this]() { _opsApplied++; };
serverGlobalParams.featureCompatibility.setVersion(
@@ -114,7 +117,8 @@ void SyncTailTest::_testSyncApplyInsertDocument(ErrorCodes::Error expectedError,
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx,
Database* db,
const BSONObj& theOperation,
- bool inSteadyStateReplication,
+ bool alwaysUpsert,
+ OplogApplication::Mode oplogApplicationMode,
stdx::function<void()>) {
applyOpCalled = true;
ASSERT_TRUE(opCtx);
@@ -125,17 +129,25 @@ void SyncTailTest::_testSyncApplyInsertDocument(ErrorCodes::Error expectedError,
ASSERT_TRUE(documentValidationDisabled(opCtx));
ASSERT_TRUE(db);
ASSERT_BSONOBJ_EQ(op, theOperation);
- ASSERT_TRUE(inSteadyStateReplication);
+ ASSERT_TRUE(alwaysUpsert);
+ ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kSecondary);
return Status::OK();
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_EQ(SyncTail::syncApply(_opCtx.get(), op, true, applyOp, failedApplyCommand, _incOps),
+ ASSERT_EQ(SyncTail::syncApply(_opCtx.get(),
+ op,
+ OplogApplication::Mode::kSecondary,
+ applyOp,
+ failedApplyCommand,
+ _incOps),
expectedError);
ASSERT_EQ(applyOpCalled, expectedError == ErrorCodes::OK);
}
-Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, bool) {
+Status failedApplyCommand(OperationContext* opCtx,
+ const BSONObj& theOperation,
+ OplogApplication::Mode) {
FAIL("applyCommand unexpectedly invoked.");
return Status::OK();
}
@@ -143,8 +155,9 @@ Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation,
Status SyncTailTest::runOpSteadyState(const OplogEntry& op) {
SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr);
MultiApplier::OperationPtrs opsPtrs{&op};
- auto syncApply = [](OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) {
- return SyncTail::syncApply(opCtx, op, inSteadyStateReplication);
+ auto syncApply = [](
+ OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) {
+ return SyncTail::syncApply(opCtx, op, oplogApplicationMode);
};
return multiSyncApply_noAbort(_opCtx.get(), &opsPtrs, syncApply);
}
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index a0738a56677..c73a8e42484 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -73,7 +73,9 @@ protected:
Status runOpsInitialSync(std::vector<OplogEntry> ops);
};
-Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, bool);
+Status failedApplyCommand(OperationContext* opCtx,
+ const BSONObj& theOperation,
+ OplogApplication::Mode);
} // namespace repl
} // namespace mongo