diff options
author | Josef Ahmad <josef.ahmad@mongodb.com> | 2022-04-19 21:47:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-19 22:28:04 +0000 |
commit | 12bbc9f4f1a5d7a4826f2f2847c0e03ebc93103e (patch) | |
tree | 26256d24f6b6244f70c0af974cadd572314e8ce1 /src/mongo/db/repl | |
parent | fe892f349c9926583ceb6ad2f7dd6c8fb6ef10e7 (diff) | |
download | mongo-12bbc9f4f1a5d7a4826f2f2847c0e03ebc93103e.tar.gz |
SERVER-45033 Atomic applyOps logs the actual writes not the request
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops_test.cpp | 75 |
2 files changed, 15 insertions, 115 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 3d864606251..81e835835d7 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -123,6 +123,10 @@ Status _applyOps(OperationContext* opCtx, << "cannot apply insert or update operation on a non-existent namespace " << nss.ns() << " in atomic applyOps mode: " << redact(opObj)); } + uassert(ErrorCodes::AtomicityFailure, + str::stream() << "cannot run atomic applyOps on namespace " << nss.ns() + << " which has change stream pre- or post-images enabled", + !collection->isChangeStreamPreAndPostImagesEnabled()); // Reject malformed or over-specified operations in an atomic applyOps. try { @@ -415,48 +419,19 @@ Status applyOps(OperationContext* opCtx, writeConflictRetry(opCtx, "applyOps", dbName, [&] { BSONObjBuilder intermediateResult; std::unique_ptr<BSONArrayBuilder> opsBuilder; - if (opCtx->writesAreReplicated()) { - opsBuilder = std::make_unique<BSONArrayBuilder>(); - } - WriteUnitOfWork wunit(opCtx); - numApplied = 0; - { - // Suppress replication for atomic operations until end of applyOps. - repl::UnreplicatedWritesBlock uwb(opCtx); - uassertStatusOK(_applyOps(opCtx, - info, - oplogApplicationMode, - &intermediateResult, - &numApplied, - opsBuilder.get())); - } - // Generate oplog entry for all atomic ops collectively. - if (opCtx->writesAreReplicated()) { - // We want this applied atomically on secondaries so we rewrite the oplog entry - // without the pre-condition for speed. - - BSONObjBuilder cmdBuilder; - - auto opsFieldName = applyOpCmd.firstElement().fieldNameStringData(); - for (auto elem : applyOpCmd) { - auto name = elem.fieldNameStringData(); - if (name == opsFieldName && opsBuilder) { - cmdBuilder.append(opsFieldName, opsBuilder->arr()); - continue; - } - if (name == ApplyOps::kPreconditionFieldName) - continue; - if (name == bypassDocumentValidationCommandOption()) - continue; - cmdBuilder.append(elem); - } - const BSONObj cmdRewritten = cmdBuilder.done(); + // If we were to replicate the original applyOps operation we received, we could + // replicate an applyOps that includes no-op writes. Oplog readers, like change streams, + // would then see entries for writes that did not happen. To work around this, we group + // all writes in this WUOW into a new applyOps entry so that we only replicate writes + // that actually happen. + // Note that the applyOps command doesn't update config.transactions for retryable + // writes, nor does it support change stream pre- and post-images. - auto opObserver = getGlobalServiceContext()->getOpObserver(); - invariant(opObserver); - opObserver->onApplyOps(opCtx, dbName, cmdRewritten); - } + WriteUnitOfWork wunit(opCtx, true /*groupOplogEntries*/); + numApplied = 0; + uassertStatusOK(_applyOps( + opCtx, info, oplogApplicationMode, &intermediateResult, &numApplied, nullptr)); wunit.commit(); result->appendElements(intermediateResult.obj()); }); diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp index baf5a770d7a..1c35bbef174 100644 --- a/src/mongo/db/repl/apply_ops_test.cpp +++ b/src/mongo/db/repl/apply_ops_test.cpp @@ -154,40 +154,6 @@ TEST_F(ApplyOpsTest, CommandInNestedApplyOpsReturnsSuccess) { ASSERT_BSONOBJ_EQ({}, _opObserver->onApplyOpsCmdObj); } -TEST_F(ApplyOpsTest, InsertInNestedApplyOpsReturnsSuccess) { - auto opCtx = cc().makeOperationContext(); - auto mode = OplogApplication::Mode::kApplyOpsCmd; - // Make sure the apply ops command object contains the correct UUID information. - CollectionOptions options; - options.uuid = UUID::gen(); - BSONObjBuilder resultBuilder; - NamespaceString nss("test", "foo"); - auto innerCmdObj = BSON("op" - << "i" - << "ns" << nss.ns() << "o" - << BSON("_id" - << "a") - << "ui" << options.uuid.get()); - auto innerApplyOpsObj = BSON("op" - << "c" - << "ns" << nss.getCommandNS().ns() << "o" - << BSON("applyOps" << BSON_ARRAY(innerCmdObj))); - auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj)); - - ASSERT_OK(_storage->createCollection(opCtx.get(), nss, options)); - ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder)); - ASSERT_BSONOBJ_EQ(BSON("applyOps" << BSON_ARRAY(innerCmdObj)), _opObserver->onApplyOpsCmdObj); -} - -TEST_F(ApplyOpsTest, AtomicApplyOpsWithNoOpsReturnsSuccess) { - auto opCtx = cc().makeOperationContext(); - auto mode = OplogApplication::Mode::kApplyOpsCmd; - BSONObjBuilder resultBuilder; - auto cmdObj = BSON("applyOps" << BSONArray()); - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); - ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj); -} - /** * Creates an applyOps command object with a single insert operation. */ @@ -218,25 +184,6 @@ TEST_F(ApplyOpsTest, ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); } - -TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithUuid) { - auto opCtx = cc().makeOperationContext(); - auto mode = OplogApplication::Mode::kApplyOpsCmd; - NamespaceString nss("test.t"); - - auto uuid = UUID::gen(); - - CollectionOptions collectionOptions; - collectionOptions.uuid = uuid; - ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions)); - - auto documentToInsert = BSON("_id" << 0); - auto cmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert); - BSONObjBuilder resultBuilder; - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); - ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj); -} - TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) { auto opCtx = cc().makeOperationContext(); auto mode = OplogApplication::Mode::kApplyOpsCmd; @@ -262,28 +209,6 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) { ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); } -TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithoutUuidIntoCollectionWithUuid) { - auto opCtx = cc().makeOperationContext(); - auto mode = OplogApplication::Mode::kApplyOpsCmd; - NamespaceString nss("test.t"); - - auto uuid = UUID::gen(); - - CollectionOptions collectionOptions; - collectionOptions.uuid = uuid; - ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions)); - - auto documentToInsert = BSON("_id" << 0); - auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert); - BSONObjBuilder resultBuilder; - ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder)); - - // Insert operation provided by caller did not contain collection uuid but applyOps() should add - // the uuid to the oplog entry. - auto expectedCmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert); - ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj); -} - TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { auto opCtx = cc().makeOperationContext(); |