summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/apply_ops.cpp
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2017-11-14 21:24:22 -0500
committerWilliam Schultz <william.schultz@mongodb.com>2017-11-14 21:24:22 -0500
commitd5cce0599a6f44f653eca53d728bb52b2f8fae6c (patch)
tree90462ce20a8f1cde7ce334968b7657124d7dfe33 /src/mongo/db/repl/apply_ops.cpp
parent22e18d7f0ee7eede964c27b444035544f7b8cf68 (diff)
downloadmongo-d5cce0599a6f44f653eca53d728bb52b2f8fae6c.tar.gz
SERVER-31384 applyOps should correctly propagate oplog application mode
Diffstat (limited to 'src/mongo/db/repl/apply_ops.cpp')
-rw-r--r--src/mongo/db/repl/apply_ops.cpp59
1 files changed, 23 insertions, 36 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 633deed78dc..3c50d17c69c 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collation_spec.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/service_context.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -56,10 +55,11 @@
#include "mongo/util/log.h"
namespace mongo {
-namespace {
-const auto kPreconditionFieldName = "preCondition"_sd;
-const auto kOplogApplicationModeFieldName = "oplogApplicationMode"_sd;
+constexpr StringData ApplyOps::kPreconditionFieldName;
+constexpr StringData ApplyOps::kOplogApplicationModeFieldName;
+
+namespace {
// If enabled, causes loop in _applyOps() to hang after applying current operation.
MONGO_FP_DECLARE(applyOpsPauseBetweenOperations);
@@ -103,6 +103,7 @@ bool _areOpsCrudOnly(const BSONObj& applyOpCmd) {
Status _applyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd,
+ repl::OplogApplication::Mode oplogApplicationMode,
BSONObjBuilder* result,
int* numApplied,
BSONArrayBuilder* opsBuilder) {
@@ -117,30 +118,7 @@ Status _applyOps(OperationContext* opCtx,
applyOpCmd.hasField("alwaysUpsert") ? applyOpCmd["alwaysUpsert"].trueValue() : true;
const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork();
- // TODO (SERVER-31384): This code is run when applying 'applyOps' oplog entries. Pass through
- // oplog application mode from applyCommand_inlock as default and consider proper behavior
- // if default differs from oplog entry.
- repl::OplogApplication::Mode oplogApplicationMode = repl::OplogApplication::Mode::kApplyOps;
- std::string oplogApplicationModeString;
- auto status = bsonExtractStringField(
- applyOpCmd, kOplogApplicationModeFieldName, &oplogApplicationModeString);
- if (status.isOK()) {
- auto modeSW = repl::OplogApplication::parseMode(oplogApplicationModeString);
- if (!modeSW.isOK()) {
- return Status(modeSW.getStatus().code(),
- str::stream() << "Could not parse " << kOplogApplicationModeFieldName
- << ": "
- << modeSW.getStatus().reason());
- }
- oplogApplicationMode = modeSW.getValue();
- } else if (status != ErrorCodes::NoSuchKey) {
- // NoSuchKey means the user did not supply a mode.
- return Status(status.code(),
- str::stream() << "Could not parse out " << kOplogApplicationModeFieldName
- << ": "
- << status.reason());
- }
-
+ // Apply each op in the given 'applyOps' command object.
while (i.more()) {
BSONElement e = i.next();
const BSONObj& opObj = e.Obj();
@@ -334,7 +312,7 @@ Status _applyOps(OperationContext* opCtx,
}
bool _hasPrecondition(const BSONObj& applyOpCmd) {
- return applyOpCmd[kPreconditionFieldName].type() == Array;
+ return applyOpCmd[ApplyOps::kPreconditionFieldName].type() == Array;
}
Status _checkPrecondition(OperationContext* opCtx,
@@ -343,7 +321,7 @@ Status _checkPrecondition(OperationContext* opCtx,
invariant(opCtx->lockState()->isW());
invariant(_hasPrecondition(applyOpCmd));
- for (auto elem : applyOpCmd[kPreconditionFieldName].Obj()) {
+ for (auto elem : applyOpCmd[ApplyOps::kPreconditionFieldName].Obj()) {
auto preCondition = elem.Obj();
if (preCondition["ns"].type() != BSONType::String) {
return {ErrorCodes::InvalidNamespace,
@@ -387,6 +365,7 @@ Status _checkPrecondition(OperationContext* opCtx,
Status applyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd,
+ repl::OplogApplication::Mode oplogApplicationMode,
BSONObjBuilder* result) {
bool allowAtomic = false;
uassertStatusOK(
@@ -432,8 +411,10 @@ Status applyOps(OperationContext* opCtx,
}
int numApplied = 0;
- if (!isAtomic)
- return _applyOps(opCtx, dbName, applyOpCmd, result, &numApplied, nullptr);
+ if (!isAtomic) {
+ return _applyOps(
+ opCtx, dbName, applyOpCmd, oplogApplicationMode, result, &numApplied, nullptr);
+ }
// Perform write ops atomically
invariant(globalWriteLock);
@@ -451,8 +432,13 @@ Status applyOps(OperationContext* opCtx,
{
// Suppress replication for atomic operations until end of applyOps.
repl::UnreplicatedWritesBlock uwb(opCtx);
- uassertStatusOK(_applyOps(
- opCtx, dbName, applyOpCmd, &intermediateResult, &numApplied, opsBuilder.get()));
+ uassertStatusOK(_applyOps(opCtx,
+ dbName,
+ applyOpCmd,
+ oplogApplicationMode,
+ &intermediateResult,
+ &numApplied,
+ opsBuilder.get()));
}
// Generate oplog entry for all atomic ops collectively.
if (opCtx->writesAreReplicated()) {
@@ -468,7 +454,7 @@ Status applyOps(OperationContext* opCtx,
cmdBuilder.append(opsFieldName, opsBuilder->arr());
continue;
}
- if (name == kPreconditionFieldName)
+ if (name == ApplyOps::kPreconditionFieldName)
continue;
if (name == bypassDocumentValidationCommandOption())
continue;
@@ -487,7 +473,8 @@ Status applyOps(OperationContext* opCtx,
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::AtomicityFailure) {
// Retry in non-atomic mode.
- return _applyOps(opCtx, dbName, applyOpCmd, result, &numApplied, nullptr);
+ return _applyOps(
+ opCtx, dbName, applyOpCmd, oplogApplicationMode, result, &numApplied, nullptr);
}
BSONArrayBuilder ab;
++numApplied;