diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-10-30 16:31:48 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-10-30 16:40:41 -0400 |
commit | b49b20887cf13720c0e863d24da95cc0239889f8 (patch) | |
tree | 2faf58212c5a842cc5b0f9cab4a5f93ee44fe544 | |
parent | 350ee88b33f32b179b636f33b7db5b0c03932d24 (diff) | |
download | mongo-b49b20887cf13720c0e863d24da95cc0239889f8.tar.gz |
SERVER-31387 oplog application conflates upserting with being in steady state replication
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 201 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.h | 4 |
11 files changed, 291 insertions, 128 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 0bd95ba91ec..3d17a8c5291 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -171,7 +171,8 @@ Status _applyOps(OperationContext* opCtx, OldClientContext ctx(opCtx, nss.ns()); - status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert); + status = repl::applyOperation_inlock( + opCtx, ctx.db(), opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOps); if (!status.isOK()) return status; } else { @@ -180,7 +181,8 @@ Status _applyOps(OperationContext* opCtx, opCtx, "applyOps", nss.ns(), [opCtx, nss, opObj, opType, alwaysUpsert] { if (*opType == 'c') { invariant(opCtx->lockState()->isW()); - return repl::applyCommand_inlock(opCtx, opObj, true); + return repl::applyCommand_inlock( + opCtx, opObj, repl::OplogApplication::Mode::kApplyOps); } AutoGetCollection autoColl(opCtx, nss, MODE_IX); @@ -202,7 +204,11 @@ Status _applyOps(OperationContext* opCtx, if (!nss.isSystemDotIndexes()) { return repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert); + opCtx, + ctx.db(), + opObj, + alwaysUpsert, + repl::OplogApplication::Mode::kApplyOps); } auto fieldO = opObj["o"]; diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index b5fbdeea19f..369beaac998 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -646,7 +646,7 @@ bool ReplSource::handleDuplicateDbName(OperationContext* opCtx, void ReplSource::applyCommand(OperationContext* opCtx, const BSONObj& op) { try { - Status status = applyCommand_inlock(opCtx, op, true); + Status status = applyCommand_inlock(opCtx, op, OplogApplication::Mode::kMasterSlave); uassert(28639, "Failure applying initial sync command", status.isOK()); } catch (AssertionException& e) { log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op) @@ -661,7 +661,8 @@ void ReplSource::applyCommand(OperationContext* opCtx, const BSONObj& op) { void ReplSource::applyOperation(OperationContext* opCtx, Database* db, const BSONObj& op) { try { - Status status = applyOperation_inlock(opCtx, db, op); + Status status = + applyOperation_inlock(opCtx, db, op, false, OplogApplication::Mode::kMasterSlave); if (!status.isOK()) { uassert(15914, "Failure applying initial sync operation", diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index 97567fde6cc..a81ed55b47f 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -68,8 +68,8 @@ public: /** * Type of function to to apply a single operation. In production, this function - * would have the same outcome as calling SyncTail::syncApply() ('inSteadyStateReplication' - * value will be embedded in the function implementation). + * would have the same outcome as calling SyncTail::syncApply() (oplog application mode + * will be embedded in the function implementation). */ using ApplyOperationFn = stdx::function<Status(OperationPtrs*)>; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 567a9be6335..e91fa140176 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -898,6 +898,46 @@ std::map<std::string, ApplyOpMetadata> opsMap = { } // namespace +constexpr StringData OplogApplication::kInitialSyncOplogApplicationMode; +constexpr StringData OplogApplication::kMasterSlaveOplogApplicationMode; +constexpr StringData OplogApplication::kRecoveringOplogApplicationMode; +constexpr StringData OplogApplication::kSecondaryOplogApplicationMode; +constexpr StringData OplogApplication::kApplyOpsOplogApplicationMode; + +StringData OplogApplication::modeToString(OplogApplication::Mode mode) { + switch (mode) { + case OplogApplication::Mode::kInitialSync: + return OplogApplication::kInitialSyncOplogApplicationMode; + case OplogApplication::Mode::kMasterSlave: + return OplogApplication::kMasterSlaveOplogApplicationMode; + case OplogApplication::Mode::kRecovering: + return OplogApplication::kRecoveringOplogApplicationMode; + case OplogApplication::Mode::kSecondary: + return OplogApplication::kSecondaryOplogApplicationMode; + case OplogApplication::Mode::kApplyOps: + return OplogApplication::kApplyOpsOplogApplicationMode; + } + MONGO_UNREACHABLE; +} + +StatusWith<OplogApplication::Mode> OplogApplication::parseMode(const std::string& mode) { + if (mode == OplogApplication::kInitialSyncOplogApplicationMode) { + return OplogApplication::Mode::kInitialSync; + } else if (mode == OplogApplication::kMasterSlaveOplogApplicationMode) { + return OplogApplication::Mode::kMasterSlave; + } else if (mode == OplogApplication::kRecoveringOplogApplicationMode) { + return OplogApplication::Mode::kRecovering; + } else if (mode == OplogApplication::kSecondaryOplogApplicationMode) { + return OplogApplication::Mode::kSecondary; + } else if (mode == OplogApplication::kApplyOpsOplogApplicationMode) { + return OplogApplication::Mode::kApplyOps; + } else { + return Status(ErrorCodes::FailedToParse, + str::stream() << "Invalid oplog application mode provided: " << mode); + } + MONGO_UNREACHABLE; +} + std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO, const BSONObj& op, const NamespaceString& requestNss) { @@ -942,7 +982,8 @@ std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement Status applyOperation_inlock(OperationContext* opCtx, Database* db, const BSONObj& op, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode mode, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { LOG(3) << "applying op: " << redact(op); @@ -1003,7 +1044,8 @@ Status applyOperation_inlock(OperationContext* opCtx, // During upgrade from 3.4 to 3.6, the feature compatibility version cannot change during // initial sync because we cannot do some operations with UUIDs and others without. - if (!inSteadyStateReplication && requestNss == FeatureCompatibilityVersion::kCollection) { + if ((mode == OplogApplication::Mode::kInitialSync) && + requestNss == FeatureCompatibilityVersion::kCollection) { std::string oID; auto status = bsonExtractStringField(o, "_id", &oID); if (status.isOK() && oID == FeatureCompatibilityVersion::kParameterName) { @@ -1238,7 +1280,7 @@ Status applyOperation_inlock(OperationContext* opCtx, opCounters->gotUpdate(); BSONObj updateCriteria = o2; - const bool upsert = valueB || inSteadyStateReplication; + const bool upsert = valueB || alwaysUpsert; uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply update due to missing _id: " << op.toString(), @@ -1361,7 +1403,7 @@ Status applyOperation_inlock(OperationContext* opCtx, Status applyCommand_inlock(OperationContext* opCtx, const BSONObj& op, - bool inSteadyStateReplication) { + OplogApplication::Mode mode) { std::array<StringData, 4> names = {"o", "ui", "ns", "op"}; std::array<BSONElement, 4> fields; op.getFields(names, &fields); @@ -1400,7 +1442,7 @@ Status applyCommand_inlock(OperationContext* opCtx, // Applying renameCollection during initial sync to a collection without UUID might lead to // data corruption, so we restart the initial sync. - if (fieldUI.eoo() && !inSteadyStateReplication && + if (fieldUI.eoo() && (mode == OplogApplication::Mode::kInitialSync) && o.firstElementFieldName() == std::string("renameCollection")) { if (!allowUnsafeRenamesDuringInitialSync.load()) { return Status(ErrorCodes::OplogOperationUnsupported, @@ -1420,7 +1462,7 @@ Status applyCommand_inlock(OperationContext* opCtx, // collection dropped. 'applyOps' will try to apply each individual operation, and those // will be caught then if they are a problem. auto whitelistedOps = std::vector<std::string>{"dropDatabase", "applyOps", "dbCheck"}; - if (!inSteadyStateReplication && + if ((mode == OplogApplication::Mode::kInitialSync) && (std::find(whitelistedOps.begin(), whitelistedOps.end(), o.firstElementFieldName()) == whitelistedOps.end()) && parseNs(nss.ns(), o) == FeatureCompatibilityVersion::kCollection) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index df2178a66aa..7ece1b4eedc 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -166,29 +166,51 @@ using IncrementOpsAppliedStatsFn = stdx::function<void()>; std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO, const BSONObj& op, const NamespaceString& requestNss); + +/** + * This class encapsulates an enum of the oplog application mode and functions that serialize + * and parse it. + */ +class OplogApplication { +public: + static constexpr StringData kInitialSyncOplogApplicationMode = "InitialSync"_sd; + static constexpr StringData kMasterSlaveOplogApplicationMode = "MasterSlave"_sd; + static constexpr StringData kRecoveringOplogApplicationMode = "Recovering"_sd; + static constexpr StringData kSecondaryOplogApplicationMode = "Secondary"_sd; + static constexpr StringData kApplyOpsOplogApplicationMode = "ApplyOps"_sd; + + enum class Mode { kInitialSync, kMasterSlave, kRecovering, kSecondary, kApplyOps }; + + static StringData modeToString(Mode mode); + + static StatusWith<Mode> parseMode(const std::string& mode); +}; + +inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) { + return (s << OplogApplication::modeToString(mode)); +} + /** * Take a non-command op and apply it locally * Used for applying from an oplog - * @param inSteadyStateReplication convert some updates to upserts for idempotency reasons + * @param alwaysUpsert convert some updates to upserts for idempotency reasons + * @param mode specifies what oplog application mode we are in * @param incrementOpsAppliedStats is called whenever an op is applied. * Returns failure status if the op was an update that could not be applied. */ Status applyOperation_inlock(OperationContext* opCtx, Database* db, const BSONObj& op, - bool inSteadyStateReplication = false, + bool alwaysUpsert, + OplogApplication::Mode mode, IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {}); /** * Take a command op and apply it locally * Used for applying from an oplog - * inSteadyStateReplication indicates whether we are in steady state replication, rather than - * initial sync. * Returns failure status if the op that could not be applied. */ -Status applyCommand_inlock(OperationContext* opCtx, - const BSONObj& op, - bool inSteadyStateReplication); +Status applyCommand_inlock(OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode mode); /** * Initializes the global Timestamp with the value from the timestamp of the last oplog entry. diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index bab0894f929..6b416de6e3e 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -212,7 +212,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, while (cursor->more()) { auto entry = cursor->nextSafe(); - fassertStatusOK(40294, SyncTail::syncApply(opCtx, entry, true)); + fassertStatusOK(40294, + SyncTail::syncApply(opCtx, entry, OplogApplication::Mode::kRecovering)); _consistencyMarkers->setAppliedThrough( opCtx, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 4cde3fec5c5..35362355aad 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -59,7 +59,6 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/initial_syncer.h" #include "mongo/db/repl/multiapplier.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" @@ -305,7 +304,7 @@ bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) { // static Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, - bool inSteadyStateReplication, + OplogApplication::Mode oplogApplicationMode, ApplyOperationInLockFn applyOperationInLock, ApplyCommandInLockFn applyCommandInLock, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { @@ -322,8 +321,17 @@ Status SyncTail::syncApply(OperationContext* opCtx, UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); - Status status = - applyOperationInLock(opCtx, db, op, inSteadyStateReplication, incrementOpsAppliedStats); + // We convert updates to upserts when not in initial sync because after rollback and during + // startup we may replay an update after a delete and crash since we do not ignore + // errors. In initial sync we simply ignore these update errors so there is no reason to + // upsert. + // + // TODO (SERVER-21700): Never upsert during oplog application unless an external applyOps + // wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP + // mode (similar to initial sync) instead so we do not accidentally ignore real errors. + bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync); + Status status = applyOperationInLock( + opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } @@ -384,7 +392,7 @@ Status SyncTail::syncApply(OperationContext* opCtx, Lock::GlobalWrite globalWriteLock(opCtx); // special case apply for commands to avoid implicit database creation - Status status = applyCommandInLock(opCtx, op, inSteadyStateReplication); + Status status = applyCommandInLock(opCtx, op, oplogApplicationMode); incrementOpsAppliedStats(); return status; }); @@ -399,10 +407,10 @@ Status SyncTail::syncApply(OperationContext* opCtx, Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, - bool inSteadyStateReplication) { + OplogApplication::Mode oplogApplicationMode) { return SyncTail::syncApply(opCtx, op, - inSteadyStateReplication, + oplogApplicationMode, applyOperation_inlock, applyCommand_inlock, stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL)); @@ -1165,8 +1173,9 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail*) { initializeWriterThread(); auto opCtx = cc().makeOperationContext(); - auto syncApply = [](OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) { - return SyncTail::syncApply(opCtx, op, inSteadyStateReplication); + auto syncApply = []( + OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) { + return SyncTail::syncApply(opCtx, op, oplogApplicationMode); }; fassertNoTrace(16359, multiSyncApply_noAbort(opCtx.get(), ops, syncApply)); @@ -1192,7 +1201,9 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, } // This function is only called in steady state replication. - const bool inSteadyStateReplication = true; + // TODO: This function can be called when we're in recovering as well as secondary. Set this + // mode correctly. + const OplogApplication::Mode oplogApplicationMode = OplogApplication::Mode::kSecondary; // doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op // of a failed group and not allowing further group inserts until that op has been processed. @@ -1310,7 +1321,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, try { // Apply the group of inserts. uassertStatusOK( - syncApply(opCtx, groupedInsertBuilder.done(), inSteadyStateReplication)); + syncApply(opCtx, groupedInsertBuilder.done(), oplogApplicationMode)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. oplogEntriesIterator = endOfGroupableOpsIterator - 1; @@ -1330,7 +1341,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, // If we didn't create a group, try to apply the op individually. try { - const Status status = syncApply(opCtx, entry->raw, inSteadyStateReplication); + const Status status = syncApply(opCtx, entry->raw, oplogApplicationMode); if (!status.isOK()) { severe() << "Error applying operation (" << redact(entry->raw) @@ -1373,13 +1384,11 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx, // allow us to get through the magic barrier opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - // This function is only called in initial sync, as its name suggests. - const bool inSteadyStateReplication = false; - for (auto it = ops->begin(); it != ops->end(); ++it) { auto& entry = **it; try { - const Status s = SyncTail::syncApply(opCtx, entry.raw, inSteadyStateReplication); + const Status s = + SyncTail::syncApply(opCtx, entry.raw, OplogApplication::Mode::kInitialSync); if (!s.isOK()) { // In initial sync, update operations can cause documents to be missed during // collection cloning. As a result, it is possible that a document that we need to diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index e90ab2514bf..b3ae1fe4a5b 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -34,6 +34,7 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/repl/multiapplier.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/stdx/functional.h" #include "mongo/util/concurrency/old_thread_pool.h" @@ -65,26 +66,27 @@ public: * Used for applying from an oplog. * 'db' is the database where the op will be applied. * 'opObj' is a BSONObj describing the op to be applied. - * 'inSteadyStateReplication' indicates to convert some updates to upserts for idempotency - * reasons. + * 'alwaysUpsert' indicates to convert updates to upserts for idempotency reasons. + * 'mode' indicates the oplog application mode. * 'opCounter' is used to update server status metrics. * Returns failure status if the op was an update that could not be applied. */ - using ApplyOperationInLockFn = stdx::function<Status(OperationContext* opCtx, - Database* db, - const BSONObj& opObj, - bool inSteadyStateReplication, - IncrementOpsAppliedStatsFn opCounter)>; + using ApplyOperationInLockFn = + stdx::function<Status(OperationContext* opCtx, + Database* db, + const BSONObj& opObj, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, + IncrementOpsAppliedStatsFn opCounter)>; /** * Type of function that takes a command op and applies it locally. * Used for applying from an oplog. - * inSteadyStateReplication indicates whether we are in steady state replication, rather than - * initial sync. + * 'mode' indicates the oplog application mode. * Returns failure status if the op that could not be applied. */ - using ApplyCommandInLockFn = - stdx::function<Status(OperationContext*, const BSONObj&, bool inSteadyStateReplication)>; + using ApplyCommandInLockFn = stdx::function<Status( + OperationContext*, const BSONObj&, OplogApplication::Mode oplogApplicationMode)>; SyncTail(BackgroundSync* q, MultiSyncApplyFunc func); SyncTail(BackgroundSync* q, MultiSyncApplyFunc func, std::unique_ptr<OldThreadPool> writerPool); @@ -102,14 +104,14 @@ public: */ static Status syncApply(OperationContext* opCtx, const BSONObj& o, - bool inSteadyStateReplication, + OplogApplication::Mode oplogApplicationMode, ApplyOperationInLockFn applyOperationInLock, ApplyCommandInLockFn applyCommandInLock, IncrementOpsAppliedStatsFn incrementOpsAppliedStats); static Status syncApply(OperationContext* opCtx, const BSONObj& o, - bool inSteadyStateReplication); + OplogApplication::Mode oplogApplicationMode); void oplogApplication(ReplicationCoordinator* replCoord); bool peek(OperationContext* opCtx, BSONObj* obj); @@ -277,7 +279,7 @@ Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, * SyncTail::syncApply. */ using SyncApplyFn = stdx::function<Status( - OperationContext* opCtx, const BSONObj& o, bool inSteadyStateReplication)>; + OperationContext* opCtx, const BSONObj& o, OplogApplication::Mode oplogApplicationMode)>; Status multiSyncApply_noAbort(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncApplyFn syncApply); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index e964e291b50..001469032ff 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -162,8 +162,10 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) { TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) { const BSONObj op = BSON("op" << "x"); - ASSERT_EQUALS(ErrorCodes::BadValue, - SyncTail::syncApply(_opCtx.get(), op, false, _applyOp, _applyCmd, _incOps)); + ASSERT_EQUALS( + ErrorCodes::BadValue, + SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kInitialSync, _applyOp, _applyCmd, _incOps)); ASSERT_EQUALS(0U, _opsApplied); } @@ -171,7 +173,7 @@ TEST_F(SyncTailTest, SyncApplyNoNamespaceNoOp) { ASSERT_OK(SyncTail::syncApply(_opCtx.get(), BSON("op" << "n"), - false)); + OplogApplication::Mode::kInitialSync)); ASSERT_EQUALS(0U, _opsApplied); } @@ -182,11 +184,13 @@ TEST_F(SyncTailTest, SyncApplyBadOp) { << "test.t"); ASSERT_EQUALS( ErrorCodes::BadValue, - SyncTail::syncApply(_opCtx.get(), op, false, _applyOp, _applyCmd, _incOps).code()); + SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kInitialSync, _applyOp, _applyCmd, _incOps) + .code()); ASSERT_EQUALS(0U, _opsApplied); } -TEST_F(SyncTailTest, SyncApplyNoOp) { +TEST_F(SyncTailTest, SyncApplyNoOpInitialSync) { const BSONObj op = BSON("op" << "n" << "ns" @@ -195,7 +199,8 @@ TEST_F(SyncTailTest, SyncApplyNoOp) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, stdx::function<void()>) { applyOpCalled = true; ASSERT_TRUE(opCtx); @@ -204,12 +209,52 @@ TEST_F(SyncTailTest, SyncApplyNoOp) { ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_TRUE(db); ASSERT_BSONOBJ_EQ(op, theOperation); - ASSERT_FALSE(inSteadyStateReplication); + ASSERT_FALSE(alwaysUpsert); + ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kInitialSync); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, failedApplyCommand, _incOps)); + ASSERT_OK(SyncTail::syncApply(_opCtx.get(), + op, + OplogApplication::Mode::kInitialSync, + applyOp, + failedApplyCommand, + _incOps)); + ASSERT_TRUE(applyOpCalled); +} + +TEST_F(SyncTailTest, SyncApplyNoOpNotInitialSync) { + const BSONObj op = BSON("op" + << "n" + << "ns" + << "test.t"); + bool applyOpCalled = false; + SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, + Database* db, + const BSONObj& theOperation, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, + stdx::function<void()>) { + applyOpCalled = true; + ASSERT_TRUE(opCtx); + ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode("test", MODE_X)); + ASSERT_FALSE(opCtx->writesAreReplicated()); + ASSERT_TRUE(documentValidationDisabled(opCtx)); + ASSERT_TRUE(db); + ASSERT_BSONOBJ_EQ(op, theOperation); + ASSERT(alwaysUpsert); + ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kSecondary); + return Status::OK(); + }; + ASSERT_TRUE(_opCtx->writesAreReplicated()); + ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); + ASSERT_OK(SyncTail::syncApply(_opCtx.get(), + op, + OplogApplication::Mode::kSecondary, + applyOp, + failedApplyCommand, + _incOps)); ASSERT_TRUE(applyOpCalled); } @@ -278,7 +323,8 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, stdx::function<void()>) { applyOpCalled = true; ASSERT_TRUE(opCtx); @@ -287,12 +333,18 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) { ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_TRUE(db); ASSERT_BSONOBJ_EQ(op, theOperation); - ASSERT_FALSE(inSteadyStateReplication); + ASSERT_FALSE(alwaysUpsert); + ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kInitialSync); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, failedApplyCommand, _incOps)); + ASSERT_OK(SyncTail::syncApply(_opCtx.get(), + op, + OplogApplication::Mode::kInitialSync, + applyOp, + failedApplyCommand, + _incOps)); ASSERT_TRUE(applyOpCalled); } @@ -305,24 +357,27 @@ TEST_F(SyncTailTest, SyncApplyCommand) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, stdx::function<void()>) { FAIL("applyOperation unexpectedly invoked."); return Status::OK(); }; - SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* opCtx, const BSONObj& theOperation, bool inSteadyStateReplication) { - applyCmdCalled = true; - ASSERT_TRUE(opCtx); - ASSERT_TRUE(opCtx->lockState()->isW()); - ASSERT_TRUE(opCtx->writesAreReplicated()); - ASSERT_FALSE(documentValidationDisabled(opCtx)); - ASSERT_BSONOBJ_EQ(op, theOperation); - return Status::OK(); - }; + SyncTail::ApplyCommandInLockFn applyCmd = [&](OperationContext* opCtx, + const BSONObj& theOperation, + OplogApplication::Mode oplogApplicationMode) { + applyCmdCalled = true; + ASSERT_TRUE(opCtx); + ASSERT_TRUE(opCtx->lockState()->isW()); + ASSERT_TRUE(opCtx->writesAreReplicated()); + ASSERT_FALSE(documentValidationDisabled(opCtx)); + ASSERT_BSONOBJ_EQ(op, theOperation); + return Status::OK(); + }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, applyCmd, _incOps)); + ASSERT_OK(SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, applyCmd, _incOps)); ASSERT_TRUE(applyCmdCalled); ASSERT_EQUALS(1U, _opsApplied); } @@ -336,20 +391,23 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, stdx::function<void()>) { FAIL("applyOperation unexpectedly invoked."); return Status::OK(); }; - SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* opCtx, const BSONObj& theOperation, bool inSteadyStateReplication) { - applyCmdCalled++; - if (applyCmdCalled < 5) { - throw WriteConflictException(); - } - return Status::OK(); - }; - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, applyCmd, _incOps)); + SyncTail::ApplyCommandInLockFn applyCmd = [&](OperationContext* opCtx, + const BSONObj& theOperation, + OplogApplication::Mode oplogApplicationMode) { + applyCmdCalled++; + if (applyCmdCalled < 5) { + throw WriteConflictException(); + } + return Status::OK(); + }; + ASSERT_OK(SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kInitialSync, applyOp, applyCmd, _incOps)); ASSERT_EQUALS(5, applyCmdCalled); ASSERT_EQUALS(1U, _opsApplied); } @@ -623,13 +681,14 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); - auto syncApply = [](OperationContext* opCtx, const BSONObj&, bool convertUpdatesToUpserts) { - ASSERT_FALSE(opCtx->writesAreReplicated()); - ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); - ASSERT_TRUE(documentValidationDisabled(opCtx)); - ASSERT_TRUE(convertUpdatesToUpserts); - return Status::OK(); - }; + auto syncApply = + [](OperationContext* opCtx, const BSONObj&, OplogApplication::Mode oplogApplicationMode) { + ASSERT_FALSE(opCtx->writesAreReplicated()); + ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); + ASSERT_TRUE(documentValidationDisabled(opCtx)); + ASSERT_EQUALS(OplogApplication::Mode::kSecondary, oplogApplicationMode); + return Status::OK(); + }; auto op = makeUpdateDocumentOplogEntry( {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; @@ -639,7 +698,7 @@ TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperat TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); OplogEntry op(OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, nss, BSONObj()); - auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { + auto syncApply = [](OperationContext*, const BSONObj&, OplogApplication::Mode) -> Status { return {ErrorCodes::OperationFailed, ""}; }; MultiApplier::OperationPtrs ops = {&op}; @@ -650,7 +709,7 @@ TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToAppl TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); OplogEntry op(OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, nss, BSONObj()); - auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status { + auto syncApply = [](OperationContext*, const BSONObj&, OplogApplication::Mode) -> Status { uasserted(ErrorCodes::OperationFailed, ""); MONGO_UNREACHABLE; }; @@ -670,10 +729,11 @@ TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplyin auto op3 = makeOp("test.t2"); auto op4 = makeOp("test.t3"); MultiApplier::Operations operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(OplogEntry(op)); - return Status::OK(); - }; + auto syncApply = + [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { + operationsApplied.push_back(OplogEntry(op)); + return Status::OK(); + }; MultiApplier::OperationPtrs ops = {&op4, &op1, &op3, &op2}; ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); ASSERT_EQUALS(4U, operationsApplied.size()); @@ -698,10 +758,11 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin auto insertOp2a = makeOp(nss2); auto insertOp2b = makeOp(nss2); std::vector<BSONObj> operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(op.copy()); - return Status::OK(); - }; + auto syncApply = + [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; MultiApplier::OperationPtrs ops = { &createOp1, &createOp2, &insertOp1a, &insertOp2a, &insertOp1b, &insertOp2b}; @@ -750,10 +811,11 @@ TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation) operationsToApply.push_back(createOp); std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); std::vector<BSONObj> operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(op.copy()); - return Status::OK(); - }; + auto syncApply = + [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { @@ -817,10 +879,11 @@ TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchSizeWhenGroupingInsertOperations) } std::vector<BSONObj> operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(op.copy()); - return Status::OK(); - }; + auto syncApply = + [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; // Apply the ops. ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); @@ -859,10 +922,11 @@ TEST_F(SyncTailTest, MultiSyncApplyAppliesOpIndividuallyWhenOpIndividuallyExceed } std::vector<BSONObj> operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(op.copy()); - return Status::OK(); - }; + auto syncApply = + [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; // Apply the ops. ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); @@ -892,10 +956,11 @@ TEST_F(SyncTailTest, MultiSyncApplyAppliesInsertOpsIndividuallyWhenUnableToCreat makeOp(NamespaceString(testNs + "_3"))}; std::vector<BSONObj> operationsApplied; - auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(op.copy()); - return Status::OK(); - }; + auto syncApply = + [&operationsApplied](OperationContext*, const BSONObj& op, OplogApplication::Mode) { + operationsApplied.push_back(op.copy()); + return Status::OK(); + }; MultiApplier::OperationPtrs ops; for (auto&& op : operationsToApply) { @@ -935,8 +1000,8 @@ TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGro std::size_t numFailedGroupedInserts = 0; MultiApplier::Operations operationsApplied; - auto syncApply = [&numFailedGroupedInserts, - &operationsApplied](OperationContext*, const BSONObj& op, bool) -> Status { + auto syncApply = [&numFailedGroupedInserts, &operationsApplied]( + OperationContext*, const BSONObj& op, OplogApplication::Mode) -> Status { // Reject grouped insert operations. if (op["o"].type() == BSONType::Array) { numFailedGroupedInserts++; diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 070f76dfc8c..f101cfe59d8 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -86,9 +86,12 @@ void SyncTailTest::setUp() { _applyOp = [](OperationContext* opCtx, Database* db, const BSONObj& op, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, stdx::function<void()>) { return Status::OK(); }; - _applyCmd = [](OperationContext* opCtx, const BSONObj& op, bool) { return Status::OK(); }; + _applyCmd = [](OperationContext* opCtx, + const BSONObj& op, + OplogApplication::Mode oplogApplicationMode) { return Status::OK(); }; _incOps = [this]() { _opsApplied++; }; serverGlobalParams.featureCompatibility.setVersion( @@ -114,7 +117,8 @@ void SyncTailTest::_testSyncApplyInsertDocument(ErrorCodes::Error expectedError, SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, Database* db, const BSONObj& theOperation, - bool inSteadyStateReplication, + bool alwaysUpsert, + OplogApplication::Mode oplogApplicationMode, stdx::function<void()>) { applyOpCalled = true; ASSERT_TRUE(opCtx); @@ -125,17 +129,25 @@ void SyncTailTest::_testSyncApplyInsertDocument(ErrorCodes::Error expectedError, ASSERT_TRUE(documentValidationDisabled(opCtx)); ASSERT_TRUE(db); ASSERT_BSONOBJ_EQ(op, theOperation); - ASSERT_TRUE(inSteadyStateReplication); + ASSERT_TRUE(alwaysUpsert); + ASSERT_EQUALS(oplogApplicationMode, OplogApplication::Mode::kSecondary); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_EQ(SyncTail::syncApply(_opCtx.get(), op, true, applyOp, failedApplyCommand, _incOps), + ASSERT_EQ(SyncTail::syncApply(_opCtx.get(), + op, + OplogApplication::Mode::kSecondary, + applyOp, + failedApplyCommand, + _incOps), expectedError); ASSERT_EQ(applyOpCalled, expectedError == ErrorCodes::OK); } -Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, bool) { +Status failedApplyCommand(OperationContext* opCtx, + const BSONObj& theOperation, + OplogApplication::Mode) { FAIL("applyCommand unexpectedly invoked."); return Status::OK(); } @@ -143,8 +155,9 @@ Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, Status SyncTailTest::runOpSteadyState(const OplogEntry& op) { SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); MultiApplier::OperationPtrs opsPtrs{&op}; - auto syncApply = [](OperationContext* opCtx, const BSONObj& op, bool inSteadyStateReplication) { - return SyncTail::syncApply(opCtx, op, inSteadyStateReplication); + auto syncApply = []( + OperationContext* opCtx, const BSONObj& op, OplogApplication::Mode oplogApplicationMode) { + return SyncTail::syncApply(opCtx, op, oplogApplicationMode); }; return multiSyncApply_noAbort(_opCtx.get(), &opsPtrs, syncApply); } diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index a0738a56677..c73a8e42484 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -73,7 +73,9 @@ protected: Status runOpsInitialSync(std::vector<OplogEntry> ops); }; -Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, bool); +Status failedApplyCommand(OperationContext* opCtx, + const BSONObj& theOperation, + OplogApplication::Mode); } // namespace repl } // namespace mongo |