diff options
author | Maria van Keulen <maria@mongodb.com> | 2017-04-28 17:10:51 -0400 |
---|---|---|
committer | Maria van Keulen <maria@mongodb.com> | 2017-05-17 10:38:35 -0400 |
commit | 975804ed16ed446e32e7e73643188c9276686311 (patch) | |
tree | 6a84210ed6bfd8243592360eaf5a21b269a1d11a | |
parent | cc032640f069a96a54ca2da2e7a0f2072c4b36aa (diff) | |
download | mongo-975804ed16ed446e32e7e73643188c9276686311.tar.gz |
SERVER-28594 Log each individual non-atomic op in applyOps
-rw-r--r-- | jstests/noPassthrough/non_atomic_apply_ops_logging.js | 62 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 228 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 89 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 9 |
4 files changed, 223 insertions, 165 deletions
diff --git a/jstests/noPassthrough/non_atomic_apply_ops_logging.js b/jstests/noPassthrough/non_atomic_apply_ops_logging.js new file mode 100644 index 00000000000..f93e9d38189 --- /dev/null +++ b/jstests/noPassthrough/non_atomic_apply_ops_logging.js @@ -0,0 +1,62 @@ +// SERVER-28594 Ensure non-atomic ops are individually logged in applyOps +// and atomic ops are collectively logged in applyOps. +(function() { + "use strict"; + + let rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + let primary = rst.getPrimary(); + let testDB = primary.getDB("test"); + let oplogColl = primary.getDB("local").oplog.rs; + let testCollName = "testColl"; + let rerenamedCollName = "rerenamedColl"; + + testDB.runCommand({drop: testCollName}); + testDB.runCommand({drop: rerenamedCollName}); + assert.commandWorked(testDB.runCommand({create: testCollName})); + let testColl = testDB[testCollName]; + + // Ensure atomic apply ops logging only produces one oplog entry + // per call to apply ops and does not log individual operations + // separately. + assert.commandWorked(testDB.runCommand({ + applyOps: [ + {op: "i", ns: testColl.getFullName(), o: {_id: 1, a: "foo"}}, + {op: "i", ns: testColl.getFullName(), o: {_id: 2, a: "bar"}} + ] + })); + assert.eq(oplogColl.find({"o.applyOps": {"$exists": true}}).count(), 1); + assert.eq(oplogColl.find({"op": "i"}).count(), 0); + // Ensure non-atomic apply ops logging produces an oplog entry for + // each operation in the apply ops call and no record of applyOps + // appears for these operations. + assert.commandWorked(testDB.runCommand({ + applyOps: [ + { + op: "c", + ns: "test.$cmd", + o: { + renameCollection: "test.testColl", + to: "test.renamedColl", + stayTemp: false, + dropTarget: false + } + }, + { + op: "c", + ns: "test.$cmd", + o: { + renameCollection: "test.renamedColl", + to: "test." + rerenamedCollName, + stayTemp: false, + dropTarget: false + } + } + ] + })); + assert.eq(oplogColl.find({"o.renameCollection": {"$exists": true}}).count(), 2); + assert.eq(oplogColl.find({"o.applyOps": {"$exists": true}}).count(), 1); + rst.stopSet(); +})(); diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index a87b1fae268..53d79530833 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -32,6 +32,7 @@ #include "mongo/db/repl/apply_ops.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" @@ -42,6 +43,7 @@ #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/index/index_descriptor.h" #include "mongo/db/matcher/extensions_callback_disallow_extensions.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/op_observer.h" @@ -49,6 +51,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/service_context.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/log.h" namespace mongo { @@ -109,145 +112,103 @@ Status _applyOps(OperationContext* opCtx, applyOpCmd.hasField("alwaysUpsert") ? applyOpCmd["alwaysUpsert"].trueValue() : true; const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork(); - { - repl::UnreplicatedWritesBlock uwb(opCtx); + while (i.more()) { + BSONElement e = i.next(); + const BSONObj& opObj = e.Obj(); - while (i.more()) { - BSONElement e = i.next(); - const BSONObj& opObj = e.Obj(); - - // Ignore 'n' operations. - const char* opType = opObj["op"].valuestrsafe(); - if (*opType == 'n') - continue; + // Ignore 'n' operations. + const char* opType = opObj["op"].valuestrsafe(); + if (*opType == 'n') + continue; - const std::string ns = opObj["ns"].String(); - const NamespaceString nss{ns}; + const std::string ns = opObj["ns"].String(); + const NamespaceString nss{ns}; - // Need to check this here, or OldClientContext may fail an invariant. - if (*opType != 'c' && !nss.isValid()) - return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()}; + // Need to check this here, or OldClientContext may fail an invariant. + if (*opType != 'c' && !nss.isValid()) + return {ErrorCodes::InvalidNamespace, "invalid ns: " + nss.ns()}; - Status status(ErrorCodes::InternalError, ""); + Status status(ErrorCodes::InternalError, ""); - if (haveWrappingWUOW) { - invariant(*opType != 'c'); + if (haveWrappingWUOW) { + invariant(*opType != 'c'); - if (!dbHolder().get(opCtx, ns)) { - throw DBException( - "cannot create a database in atomic applyOps mode; will retry without " - "atomicity", - ErrorCodes::NamespaceNotFound); - } + if (!dbHolder().get(opCtx, ns)) { + throw DBException( + "cannot create a database in atomic applyOps mode; will retry without " + "atomicity", + ErrorCodes::NamespaceNotFound); + } - OldClientContext ctx(opCtx, ns); - status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert); - if (!status.isOK()) - return status; - logOpForDbHash(opCtx, nss); - } else { - try { - // Run operations under a nested lock as a hack to prevent yielding. - // - // The list of operations is supposed to be applied atomically; yielding - // would break atomicity by allowing an interruption or a shutdown to occur - // after only some operations are applied. We are already locked globally - // at this point, so taking a DBLock on the namespace creates a nested lock, - // and yields are disallowed for operations that hold a nested lock. - // - // We do not have a wrapping WriteUnitOfWork so it is possible for a journal - // commit to happen with a subset of ops applied. - Lock::GlobalWrite globalWriteLockDisallowTempRelease(opCtx); - - // Ensures that yielding will not happen (see the comment above). - DEV { - Locker::LockSnapshot lockSnapshot; - invariant(!opCtx->lockState()->saveLockStateAndUnlock(&lockSnapshot)); - }; - - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - if (*opType == 'c') { - status = repl::applyCommand_inlock(opCtx, opObj, true); + OldClientContext ctx(opCtx, ns); + status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert); + if (!status.isOK()) + return status; + logOpForDbHash(opCtx, nss); + } else { + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + if (*opType == 'c') { + status = repl::applyCommand_inlock(opCtx, opObj, true); + } else { + OldClientContext ctx(opCtx, ns); + const char* names[] = {"o", "ns"}; + BSONElement fields[2]; + opObj.getFields(2, names, fields); + BSONElement& fieldO = fields[0]; + BSONElement& fieldNs = fields[1]; + const StringData ns = fieldNs.valueStringData(); + NamespaceString requestNss{ns}; + + if (nss.isSystemDotIndexes()) { + BSONObj indexSpec; + NamespaceString indexNss; + std::tie(indexSpec, indexNss) = + repl::prepForApplyOpsIndexInsert(fieldO, opObj, requestNss); + BSONObjBuilder command; + command.append("createIndexes", indexNss.coll()); + { + BSONArrayBuilder indexes(command.subarrayStart("indexes")); + indexes.append(indexSpec); + indexes.doneFast(); + } + const BSONObj commandObj = command.done(); + + DBDirectClient client(opCtx); + BSONObj infoObj; + client.runCommand(nsToDatabase(ns), commandObj, infoObj); + status = getStatusFromCommandResult(infoObj); } else { - OldClientContext ctx(opCtx, ns); - status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert); } } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "applyOps", ns); - } catch (const DBException& ex) { - ab.append(false); - result->append("applied", ++(*numApplied)); - result->append("code", ex.getCode()); - result->append("codeName", - ErrorCodes::errorString(ErrorCodes::fromInt(ex.getCode()))); - result->append("errmsg", ex.what()); - result->append("results", ab.arr()); - return Status(ErrorCodes::UnknownError, ex.what()); } - WriteUnitOfWork wuow(opCtx); - logOpForDbHash(opCtx, nss); - wuow.commit(); - } - - ab.append(status.isOK()); - if (!status.isOK()) { - log() << "applyOps error applying: " << status; - errors++; + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "applyOps", ns); + } catch (const DBException& ex) { + ab.append(false); + result->append("applied", ++(*numApplied)); + result->append("code", ex.getCode()); + result->append("codeName", + ErrorCodes::errorString(ErrorCodes::fromInt(ex.getCode()))); + result->append("errmsg", ex.what()); + result->append("results", ab.arr()); + return Status(ErrorCodes::UnknownError, ex.what()); } - - (*numApplied)++; } - result->append("applied", *numApplied); - result->append("results", ab.arr()); - } // set replicatedWrites back to original value - - if (opCtx->writesAreReplicated()) { - // We want this applied atomically on slaves - // so we re-wrap without the pre-condition for speed - - // TODO: possibly use mutable BSON to remove preCondition field - // once it is available - BSONObjBuilder cmdBuilder; - - for (auto elem : applyOpCmd) { - auto name = elem.fieldNameStringData(); - if (name == "preCondition") - continue; - if (name == "bypassDocumentValidation") - continue; - cmdBuilder.append(elem); + ab.append(status.isOK()); + if (!status.isOK()) { + log() << "applyOps error applying: " << status; + errors++; } - const BSONObj cmdRewritten = cmdBuilder.done(); - - auto opObserver = getGlobalServiceContext()->getOpObserver(); - invariant(opObserver); - if (haveWrappingWUOW) { - opObserver->onApplyOps(opCtx, dbName, cmdRewritten); - } else { - // When executing applyOps outside of a wrapping WriteUnitOfWOrk, always logOp the - // command regardless of whether the individial ops succeeded and rely on any - // failures to also on secondaries. This isn't perfect, but it's what the command - // has always done and is part of its "correct" behavior. - while (true) { - try { - WriteUnitOfWork wunit(opCtx); - opObserver->onApplyOps(opCtx, dbName, cmdRewritten); - - wunit.commit(); - break; - } catch (const WriteConflictException& wce) { - LOG(2) << "WriteConflictException while logging applyOps command, retrying."; - opCtx->recoveryUnit()->abandonSnapshot(); - continue; - } - } - } + (*numApplied)++; } + result->append("applied", *numApplied); + result->append("results", ab.arr()); + if (errors != 0) { return Status(ErrorCodes::UnknownError, "applyOps had one or more errors applying ops"); } @@ -333,7 +294,34 @@ mongo::Status mongo::applyOps(OperationContext* opCtx, BSONObjBuilder intermediateResult; WriteUnitOfWork wunit(opCtx); numApplied = 0; - uassertStatusOK(_applyOps(opCtx, dbName, applyOpCmd, &intermediateResult, &numApplied)); + { + // Suppress replication for atomic operations until end of applyOps. + repl::UnreplicatedWritesBlock uwb(opCtx); + uassertStatusOK( + _applyOps(opCtx, dbName, applyOpCmd, &intermediateResult, &numApplied)); + } + // Generate oplog entry for all atomic ops collectively. + if (opCtx->writesAreReplicated()) { + // We want this applied atomically on slaves so we rewrite the oplog entry without + // the pre-condition for speed. + + BSONObjBuilder cmdBuilder; + + for (auto elem : applyOpCmd) { + auto name = elem.fieldNameStringData(); + if (name == "preCondition") + continue; + if (name == "bypassDocumentValidation") + continue; + cmdBuilder.append(elem); + } + + const BSONObj cmdRewritten = cmdBuilder.done(); + + auto opObserver = getGlobalServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onApplyOps(opCtx, dbName, cmdRewritten); + } wunit.commit(); result->appendElements(intermediateResult.obj()); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 10981dc0192..8bd76361ed6 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -682,6 +682,45 @@ std::map<std::string, ApplyOpMetadata> opsMap = { } // namespace +std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO, + const BSONObj& op, + const NamespaceString& requestNss) { + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Missing expected index spec in field 'o': " << op, + !fieldO.eoo()); + uassert(ErrorCodes::TypeMismatch, + str::stream() << "Expected object for index spec in field 'o': " << op, + fieldO.isABSONObj()); + BSONObj indexSpec = fieldO.embeddedObject(); + + std::string indexNs; + uassertStatusOK(bsonExtractStringField(indexSpec, "ns", &indexNs)); + const NamespaceString indexNss(indexNs); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid namespace in index spec: " << op, + indexNss.isValid()); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Database name mismatch for database (" << requestNss.db() + << ") while creating index: " + << op, + requestNss.db() == indexNss.db()); + + if (!indexSpec["v"]) { + // If the "v" field isn't present in the index specification, then we assume it is a + // v=1 index from an older version of MongoDB. This is because + // (1) we haven't built v=0 indexes as the default for a long time, and + // (2) the index version has been included in the corresponding oplog entry since + // v=2 indexes were introduced. + BSONObjBuilder bob; + + bob.append("v", static_cast<int>(IndexVersion::kV1)); + bob.appendElements(indexSpec); + + indexSpec = bob.obj(); + } + + return std::make_pair(indexSpec, indexNss); +} // @return failure status if an update should have happened and the document DNE. // See replset initial sync code. Status applyOperation_inlock(OperationContext* opCtx, @@ -741,27 +780,11 @@ Status applyOperation_inlock(OperationContext* opCtx, invariant(*opType != 'c'); // commands are processed in applyCommand_inlock() if (*opType == 'i') { - if (nsToCollectionSubstring(ns) == "system.indexes") { - uassert(ErrorCodes::NoSuchKey, - str::stream() << "Missing expected index spec in field 'o': " << op, - !fieldO.eoo()); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "Expected object for index spec in field 'o': " << op, - fieldO.isABSONObj()); - BSONObj indexSpec = fieldO.embeddedObject(); - - std::string indexNs; - uassertStatusOK(bsonExtractStringField(indexSpec, "ns", &indexNs)); - const NamespaceString indexNss(indexNs); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid namespace in index spec: " << op, - indexNss.isValid()); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Database name mismatch for database (" - << nsToDatabaseSubstring(ns) - << ") while creating index: " - << op, - nsToDatabaseSubstring(ns) == indexNss.db()); + if (requestNss.isSystemDotIndexes()) { + BSONObj indexSpec; + NamespaceString indexNss; + std::tie(indexSpec, indexNss) = + repl::prepForApplyOpsIndexInsert(fieldO, op, requestNss); // Check if collection exists. auto indexCollection = db->getCollection(opCtx, indexNss); @@ -772,20 +795,6 @@ Status applyOperation_inlock(OperationContext* opCtx, opCounters->gotInsert(); - if (!indexSpec["v"]) { - // If the "v" field isn't present in the index specification, then we assume it is a - // v=1 index from an older version of MongoDB. This is because - // (1) we haven't built v=0 indexes as the default for a long time, and - // (2) the index version has been included in the corresponding oplog entry since - // v=2 indexes were introduced. - BSONObjBuilder bob; - - bob.append("v", static_cast<int>(IndexVersion::kV1)); - bob.appendElements(indexSpec); - - indexSpec = bob.obj(); - } - bool relaxIndexConstraints = ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, indexNss); if (indexSpec["background"].trueValue()) { @@ -996,16 +1005,6 @@ Status applyOperation_inlock(OperationContext* opCtx, 14825, str::stream() << "error in applyOperation : unknown opType " << *opType); } - // AuthorizationManager's logOp method registers a RecoveryUnit::Change and to do so we need - // to a new WriteUnitOfWork, if we dont have a wrapping unit of work already. If we already - // have a wrapping WUOW, the extra nexting is harmless. The logOp really should have been - // done in the WUOW that did the write, but this won't happen because applyOps turns off - // observers. - WriteUnitOfWork wuow(opCtx); - getGlobalAuthorizationManager()->logOp( - opCtx, opType, requestNss, o, fieldO2.isABSONObj() ? &o2 : NULL); - wuow.commit(); - return Status::OK(); } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 69741a5b5fd..22773a4bf61 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -107,6 +107,15 @@ void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db); using IncrementOpsAppliedStatsFn = stdx::function<void()>; /** + * Take the object field of a BSONObj, the BSONObj, and the namespace of + * the operation and perform necessary validation to ensure the BSONObj is a + * properly-formed command to insert into system.indexes. This is only to + * be used for insert operations into system.indexes. It is called via applyOps. + */ +std::pair<BSONObj, NamespaceString> prepForApplyOpsIndexInsert(const BSONElement& fieldO, + const BSONObj& op, + const NamespaceString& requestNss); +/** * Take a non-command op and apply it locally * Used for applying from an oplog * @param inSteadyStateReplication convert some updates to upserts for idempotency reasons |