summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaria van Keulen <maria@mongodb.com>2017-04-28 17:10:51 -0400
committerMaria van Keulen <maria@mongodb.com>2017-05-17 10:38:35 -0400
commit975804ed16ed446e32e7e73643188c9276686311 (patch)
tree6a84210ed6bfd8243592360eaf5a21b269a1d11a
parentcc032640f069a96a54ca2da2e7a0f2072c4b36aa (diff)
downloadmongo-975804ed16ed446e32e7e73643188c9276686311.tar.gz
SERVER-28594 Log each individual non-atomic op in applyOps
-rw-r--r--jstests/noPassthrough/non_atomic_apply_ops_logging.js62
-rw-r--r--src/mongo/db/repl/apply_ops.cpp228
-rw-r--r--src/mongo/db/repl/oplog.cpp89
-rw-r--r--src/mongo/db/repl/oplog.h9
4 files changed, 223 insertions, 165 deletions
diff --git a/jstests/noPassthrough/non_atomic_apply_ops_logging.js b/jstests/noPassthrough/non_atomic_apply_ops_logging.js
new file mode 100644
index 00000000000..f93e9d38189
--- /dev/null
+++ b/jstests/noPassthrough/non_atomic_apply_ops_logging.js
@@ -0,0 +1,62 @@
+// SERVER-28594 Ensure non-atomic ops are individually logged in applyOps
+// and atomic ops are collectively logged in applyOps.
+(function() {
+ "use strict";
+
+ let rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+
+ let primary = rst.getPrimary();
+ let testDB = primary.getDB("test");
+ let oplogColl = primary.getDB("local").oplog.rs;
+ let testCollName = "testColl";
+ let rerenamedCollName = "rerenamedColl";
+
+ testDB.runCommand({drop: testCollName});
+ testDB.runCommand({drop: rerenamedCollName});
+ assert.commandWorked(testDB.runCommand({create: testCollName}));
+ let testColl = testDB[testCollName];
+
+ // Ensure atomic apply ops logging only produces one oplog entry
+ // per call to apply ops and does not log individual operations
+ // separately.
+ assert.commandWorked(testDB.runCommand({
+ applyOps: [
+ {op: "i", ns: testColl.getFullName(), o: {_id: 1, a: "foo"}},
+ {op: "i", ns: testColl.getFullName(), o: {_id: 2, a: "bar"}}
+ ]
+ }));
+ assert.eq(oplogColl.find({"o.applyOps": {"$exists": true}}).count(), 1);
+ assert.eq(oplogColl.find({"op": "i"}).count(), 0);
+ // Ensure non-atomic apply ops logging produces an oplog entry for
+ // each operation in the apply ops call and no record of applyOps
+ // appears for these operations.
+ assert.commandWorked(testDB.runCommand({
+ applyOps: [
+ {
+ op: "c",
+ ns: "test.$cmd",
+ o: {
+ renameCollection: "test.testColl",
+ to: "test.renamedColl",
+ stayTemp: false,
+ dropTarget: false
+ }
+ },
+ {
+ op: "c",
+ ns: "test.$cmd",
+ o: {
+ renameCollection: "test.renamedColl",
+ to: "test." + rerenamedCollName,
+ stayTemp: false,
+ dropTarget: false
+ }
+ }
+ ]
+ }));
+ assert.eq(oplogColl.find({"o.renameCollection": {"$exists": true}}).count(), 2);
+ assert.eq(oplogColl.find({"o.applyOps": {"$exists": true}}).count(), 1);
+ rst.stopSet();
+})();
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index a87b1fae268..53d79530833 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/repl/apply_ops.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
@@ -42,6 +43,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
#include "mongo/db/matcher/matcher.h"
#include "mongo/db/op_observer.h"
@@ -49,6 +51,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/service_context.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -109,145 +112,103 @@ Status _applyOps(OperationContext* opCtx,
applyOpCmd.hasField("alwaysUpsert") ? applyOpCmd["alwaysUpsert"].trueValue() : true;
const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork();
- {
- repl::UnreplicatedWritesBlock uwb(opCtx);
+ while (i.more()) {
+ BSONElement e = i.next();
+ const BSONObj& opObj = e.Obj();
- while (i.more()) {
- BSONElement e = i.next();
- const BSONObj& opObj = e.Obj();
-
- // Ignore 'n' operations.
- const char* opType = opObj["op"].valuestrsafe();
- if (*opType == 'n')
- continue;
+ // Ignore 'n' operations.
+ const char* opType = opObj["op"].valuestrsafe();
+ if (*opType == 'n')
+ continue;
- const std::string ns = opObj["ns"].String();
- const NamespaceString nss{ns};
+ const std::string ns = opObj["ns"].String();
+ const NamespaceString nss{ns};
- // Need to check this here, or OldClientContext may fail an invariant.
- if (*opType != 'c' && !nss.isValid())
- return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()};
+ // Need to check this here, or OldClientContext may fail an invariant.
+ if (*opType != 'c' && !nss.isValid())
+ return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()};
- Status status(ErrorCodes::InternalError, "");
+ Status status(ErrorCodes::InternalError, "");
- if (haveWrappingWUOW) {
- invariant(*opType != 'c');
+ if (haveWrappingWUOW) {
+ invariant(*opType != 'c');
- if (!dbHolder().get(opCtx, ns)) {
- throw DBException(
- "cannot create a database in atomic applyOps mode; will retry without "
- "atomicity",
- ErrorCodes::NamespaceNotFound);
- }
+ if (!dbHolder().get(opCtx, ns)) {
+ throw DBException(
+ "cannot create a database in atomic applyOps mode; will retry without "
+ "atomicity",
+ ErrorCodes::NamespaceNotFound);
+ }
- OldClientContext ctx(opCtx, ns);
- status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert);
- if (!status.isOK())
- return status;
- logOpForDbHash(opCtx, nss);
- } else {
- try {
- // Run operations under a nested lock as a hack to prevent yielding.
- //
- // The list of operations is supposed to be applied atomically; yielding
- // would break atomicity by allowing an interruption or a shutdown to occur
- // after only some operations are applied. We are already locked globally
- // at this point, so taking a DBLock on the namespace creates a nested lock,
- // and yields are disallowed for operations that hold a nested lock.
- //
- // We do not have a wrapping WriteUnitOfWork so it is possible for a journal
- // commit to happen with a subset of ops applied.
- Lock::GlobalWrite globalWriteLockDisallowTempRelease(opCtx);
-
- // Ensures that yielding will not happen (see the comment above).
- DEV {
- Locker::LockSnapshot lockSnapshot;
- invariant(!opCtx->lockState()->saveLockStateAndUnlock(&lockSnapshot));
- };
-
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- if (*opType == 'c') {
- status = repl::applyCommand_inlock(opCtx, opObj, true);
+ OldClientContext ctx(opCtx, ns);
+ status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert);
+ if (!status.isOK())
+ return status;
+ logOpForDbHash(opCtx, nss);
+ } else {
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ if (*opType == 'c') {
+ status = repl::applyCommand_inlock(opCtx, opObj, true);
+ } else {
+ OldClientContext ctx(opCtx, ns);
+ const char* names[] = {"o", "ns"};
+ BSONElement fields[2];
+ opObj.getFields(2, names, fields);
+ BSONElement& fieldO = fields[0];
+ BSONElement& fieldNs = fields[1];
+ const StringData ns = fieldNs.valueStringData();
+ NamespaceString requestNss{ns};
+
+ if (nss.isSystemDotIndexes()) {
+ BSONObj indexSpec;
+ NamespaceString indexNss;
+ std::tie(indexSpec, indexNss) =
+ repl::prepForApplyOpsIndexInsert(fieldO, opObj, requestNss);
+ BSONObjBuilder command;
+ command.append("createIndexes", indexNss.coll());
+ {
+ BSONArrayBuilder indexes(command.subarrayStart("indexes"));
+ indexes.append(indexSpec);
+ indexes.doneFast();
+ }
+ const BSONObj commandObj = command.done();
+
+ DBDirectClient client(opCtx);
+ BSONObj infoObj;
+ client.runCommand(nsToDatabase(ns), commandObj, infoObj);
+ status = getStatusFromCommandResult(infoObj);
} else {
- OldClientContext ctx(opCtx, ns);
-
status =
repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert);
}
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "applyOps", ns);
- } catch (const DBException& ex) {
- ab.append(false);
- result->append("applied", ++(*numApplied));
- result->append("code", ex.getCode());
- result->append("codeName",
- ErrorCodes::errorString(ErrorCodes::fromInt(ex.getCode())));
- result->append("errmsg", ex.what());
- result->append("results", ab.arr());
- return Status(ErrorCodes::UnknownError, ex.what());
}
- WriteUnitOfWork wuow(opCtx);
- logOpForDbHash(opCtx, nss);
- wuow.commit();
- }
-
- ab.append(status.isOK());
- if (!status.isOK()) {
- log() << "applyOps error applying: " << status;
- errors++;
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "applyOps", ns);
+ } catch (const DBException& ex) {
+ ab.append(false);
+ result->append("applied", ++(*numApplied));
+ result->append("code", ex.getCode());
+ result->append("codeName",
+ ErrorCodes::errorString(ErrorCodes::fromInt(ex.getCode())));
+ result->append("errmsg", ex.what());
+ result->append("results", ab.arr());
+ return Status(ErrorCodes::UnknownError, ex.what());
}
-
- (*numApplied)++;
}
- result->append("applied", *numApplied);
- result->append("results", ab.arr());
- } // set replicatedWrites back to original value
-
- if (opCtx->writesAreReplicated()) {
- // We want this applied atomically on slaves
- // so we re-wrap without the pre-condition for speed
-
- // TODO: possibly use mutable BSON to remove preCondition field
- // once it is available
- BSONObjBuilder cmdBuilder;
-
- for (auto elem : applyOpCmd) {
- auto name = elem.fieldNameStringData();
- if (name == "preCondition")
- continue;
- if (name == "bypassDocumentValidation")
- continue;
- cmdBuilder.append(elem);
+ ab.append(status.isOK());
+ if (!status.isOK()) {
+ log() << "applyOps error applying: " << status;
+ errors++;
}
- const BSONObj cmdRewritten = cmdBuilder.done();
-
- auto opObserver = getGlobalServiceContext()->getOpObserver();
- invariant(opObserver);
- if (haveWrappingWUOW) {
- opObserver->onApplyOps(opCtx, dbName, cmdRewritten);
- } else {
- // When executing applyOps outside of a wrapping WriteUnitOfWOrk, always logOp the
- // command regardless of whether the individial ops succeeded and rely on any
- // failures to also on secondaries. This isn't perfect, but it's what the command
- // has always done and is part of its "correct" behavior.
- while (true) {
- try {
- WriteUnitOfWork wunit(opCtx);
- opObserver->onApplyOps(opCtx, dbName, cmdRewritten);
-
- wunit.commit();
- break;
- } catch (const WriteConflictException& wce) {
- LOG(2) << "WriteConflictException while logging applyOps command, retrying.";
- opCtx->recoveryUnit()->abandonSnapshot();
- continue;
- }
- }
- }
+ (*numApplied)++;
}
+ result->append("applied", *numApplied);
+ result->append("results", ab.arr());
+
if (errors != 0) {
return Status(ErrorCodes::UnknownError, "applyOps had one or more errors applying ops");
}
@@ -333,7 +294,34 @@ mongo::Status mongo::applyOps(OperationContext* opCtx,
BSONObjBuilder intermediateResult;
WriteUnitOfWork wunit(opCtx);
numApplied = 0;
- uassertStatusOK(_applyOps(opCtx, dbName, applyOpCmd, &intermediateResult, &numApplied));
+ {
+ // Suppress replication for atomic operations until end of applyOps.
+ repl::UnreplicatedWritesBlock uwb(opCtx);
+ uassertStatusOK(
+ _applyOps(opCtx, dbName, applyOpCmd, &intermediateResult, &numApplied));
+ }
+ // Generate oplog entry for all atomic ops collectively.
+ if (opCtx->writesAreReplicated()) {
+ // We want this applied atomically on slaves so we rewrite the oplog entry without
+ // the pre-condition for speed.
+
+ BSONObjBuilder cmdBuilder;
+
+ for (auto elem : applyOpCmd) {
+ auto name = elem.fieldNameStringData();
+ if (name == "preCondition")
+ continue;
+ if (name == "bypassDocumentValidation")
+ continue;
+ cmdBuilder.append(elem);
+ }
+
+ const BSONObj cmdRewritten = cmdBuilder.done();
+
+ auto opObserver = getGlobalServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onApplyOps(opCtx, dbName, cmdRewritten);
+ }
wunit.commit();
result->appendElements(intermediateResult.obj());
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 10981dc0192..8bd76361ed6 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -682,6 +682,45 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
} // namespace
+std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO,
+ const BSONObj& op,
+ const NamespaceString& requestNss) {
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "Missing expected index spec in field 'o': " << op,
+ !fieldO.eoo());
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected object for index spec in field 'o': " << op,
+ fieldO.isABSONObj());
+ BSONObj indexSpec = fieldO.embeddedObject();
+
+ std::string indexNs;
+ uassertStatusOK(bsonExtractStringField(indexSpec, "ns", &indexNs));
+ const NamespaceString indexNss(indexNs);
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid namespace in index spec: " << op,
+ indexNss.isValid());
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Database name mismatch for database (" << requestNss.db()
+ << ") while creating index: "
+ << op,
+ requestNss.db() == indexNss.db());
+
+ if (!indexSpec["v"]) {
+ // If the "v" field isn't present in the index specification, then we assume it is a
+ // v=1 index from an older version of MongoDB. This is because
+ // (1) we haven't built v=0 indexes as the default for a long time, and
+ // (2) the index version has been included in the corresponding oplog entry since
+ // v=2 indexes were introduced.
+ BSONObjBuilder bob;
+
+ bob.append("v", static_cast<int>(IndexVersion::kV1));
+ bob.appendElements(indexSpec);
+
+ indexSpec = bob.obj();
+ }
+
+ return std::make_pair(indexSpec, indexNss);
+}
// @return failure status if an update should have happened and the document DNE.
// See replset initial sync code.
Status applyOperation_inlock(OperationContext* opCtx,
@@ -741,27 +780,11 @@ Status applyOperation_inlock(OperationContext* opCtx,
invariant(*opType != 'c'); // commands are processed in applyCommand_inlock()
if (*opType == 'i') {
- if (nsToCollectionSubstring(ns) == "system.indexes") {
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Missing expected index spec in field 'o': " << op,
- !fieldO.eoo());
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "Expected object for index spec in field 'o': " << op,
- fieldO.isABSONObj());
- BSONObj indexSpec = fieldO.embeddedObject();
-
- std::string indexNs;
- uassertStatusOK(bsonExtractStringField(indexSpec, "ns", &indexNs));
- const NamespaceString indexNss(indexNs);
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid namespace in index spec: " << op,
- indexNss.isValid());
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Database name mismatch for database ("
- << nsToDatabaseSubstring(ns)
- << ") while creating index: "
- << op,
- nsToDatabaseSubstring(ns) == indexNss.db());
+ if (requestNss.isSystemDotIndexes()) {
+ BSONObj indexSpec;
+ NamespaceString indexNss;
+ std::tie(indexSpec, indexNss) =
+ repl::prepForApplyOpsIndexInsert(fieldO, op, requestNss);
// Check if collection exists.
auto indexCollection = db->getCollection(opCtx, indexNss);
@@ -772,20 +795,6 @@ Status applyOperation_inlock(OperationContext* opCtx,
opCounters->gotInsert();
- if (!indexSpec["v"]) {
- // If the "v" field isn't present in the index specification, then we assume it is a
- // v=1 index from an older version of MongoDB. This is because
- // (1) we haven't built v=0 indexes as the default for a long time, and
- // (2) the index version has been included in the corresponding oplog entry since
- // v=2 indexes were introduced.
- BSONObjBuilder bob;
-
- bob.append("v", static_cast<int>(IndexVersion::kV1));
- bob.appendElements(indexSpec);
-
- indexSpec = bob.obj();
- }
-
bool relaxIndexConstraints =
ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss);
if (indexSpec["background"].trueValue()) {
@@ -996,16 +1005,6 @@ Status applyOperation_inlock(OperationContext* opCtx,
14825, str::stream() << "error in applyOperation : unknown opType " << *opType);
}
- // AuthorizationManager's logOp method registers a RecoveryUnit::Change and to do so we need
- // to a new WriteUnitOfWork, if we dont have a wrapping unit of work already. If we already
- // have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been
- // done in the WUOW that did the write, but this won't happen because applyOps turns off
- // observers.
- WriteUnitOfWork wuow(opCtx);
- getGlobalAuthorizationManager()->logOp(
- opCtx, opType, requestNss, o, fieldO2.isABSONObj() ? &o2 : NULL);
- wuow.commit();
-
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 69741a5b5fd..22773a4bf61 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -107,6 +107,15 @@ void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db);
using IncrementOpsAppliedStatsFn = stdx::function<void()>;
/**
+ * Take the object field of a BSONObj, the BSONObj, and the namespace of
+ * the operation and perform necessary validation to ensure the BSONObj is a
+ * properly-formed command to insert into system.indexes. This is only to
+ * be used for insert operations into system.indexes. It is called via applyOps.
+ */
+std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO,
+ const BSONObj& op,
+ const NamespaceString& requestNss);
+/**
* Take a non-command op and apply it locally
* Used for applying from an oplog
* @param inSteadyStateReplication convert some updates to upserts for idempotency reasons