summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorJosef Ahmad <josef.ahmad@mongodb.com>2022-04-19 21:47:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-19 22:28:04 +0000
commit12bbc9f4f1a5d7a4826f2f2847c0e03ebc93103e (patch)
tree26256d24f6b6244f70c0af974cadd572314e8ce1 /src/mongo/db/repl
parentfe892f349c9926583ceb6ad2f7dd6c8fb6ef10e7 (diff)
downloadmongo-12bbc9f4f1a5d7a4826f2f2847c0e03ebc93103e.tar.gz
SERVER-45033 Atomic applyOps logs the actual writes not the request
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/apply_ops.cpp55
-rw-r--r--src/mongo/db/repl/apply_ops_test.cpp75
2 files changed, 15 insertions, 115 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 3d864606251..81e835835d7 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -123,6 +123,10 @@ Status _applyOps(OperationContext* opCtx,
<< "cannot apply insert or update operation on a non-existent namespace "
<< nss.ns() << " in atomic applyOps mode: " << redact(opObj));
}
+ uassert(ErrorCodes::AtomicityFailure,
+ str::stream() << "cannot run atomic applyOps on namespace " << nss.ns()
+ << " which has change stream pre- or post-images enabled",
+ !collection->isChangeStreamPreAndPostImagesEnabled());
// Reject malformed or over-specified operations in an atomic applyOps.
try {
@@ -415,48 +419,19 @@ Status applyOps(OperationContext* opCtx,
writeConflictRetry(opCtx, "applyOps", dbName, [&] {
BSONObjBuilder intermediateResult;
std::unique_ptr<BSONArrayBuilder> opsBuilder;
- if (opCtx->writesAreReplicated()) {
- opsBuilder = std::make_unique<BSONArrayBuilder>();
- }
- WriteUnitOfWork wunit(opCtx);
- numApplied = 0;
- {
- // Suppress replication for atomic operations until end of applyOps.
- repl::UnreplicatedWritesBlock uwb(opCtx);
- uassertStatusOK(_applyOps(opCtx,
- info,
- oplogApplicationMode,
- &intermediateResult,
- &numApplied,
- opsBuilder.get()));
- }
- // Generate oplog entry for all atomic ops collectively.
- if (opCtx->writesAreReplicated()) {
- // We want this applied atomically on secondaries so we rewrite the oplog entry
- // without the pre-condition for speed.
-
- BSONObjBuilder cmdBuilder;
-
- auto opsFieldName = applyOpCmd.firstElement().fieldNameStringData();
- for (auto elem : applyOpCmd) {
- auto name = elem.fieldNameStringData();
- if (name == opsFieldName && opsBuilder) {
- cmdBuilder.append(opsFieldName, opsBuilder->arr());
- continue;
- }
- if (name == ApplyOps::kPreconditionFieldName)
- continue;
- if (name == bypassDocumentValidationCommandOption())
- continue;
- cmdBuilder.append(elem);
- }
- const BSONObj cmdRewritten = cmdBuilder.done();
+ // If we were to replicate the original applyOps operation we received, we could
+ // replicate an applyOps that includes no-op writes. Oplog readers, like change streams,
+ // would then see entries for writes that did not happen. To work around this, we group
+ // all writes in this WUOW into a new applyOps entry so that we only replicate writes
+ // that actually happen.
+ // Note that the applyOps command doesn't update config.transactions for retryable
+ // writes, nor does it support change stream pre- and post-images.
- auto opObserver = getGlobalServiceContext()->getOpObserver();
- invariant(opObserver);
- opObserver->onApplyOps(opCtx, dbName, cmdRewritten);
- }
+ WriteUnitOfWork wunit(opCtx, true /*groupOplogEntries*/);
+ numApplied = 0;
+ uassertStatusOK(_applyOps(
+ opCtx, info, oplogApplicationMode, &intermediateResult, &numApplied, nullptr));
wunit.commit();
result->appendElements(intermediateResult.obj());
});
diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp
index baf5a770d7a..1c35bbef174 100644
--- a/src/mongo/db/repl/apply_ops_test.cpp
+++ b/src/mongo/db/repl/apply_ops_test.cpp
@@ -154,40 +154,6 @@ TEST_F(ApplyOpsTest, CommandInNestedApplyOpsReturnsSuccess) {
ASSERT_BSONOBJ_EQ({}, _opObserver->onApplyOpsCmdObj);
}
-TEST_F(ApplyOpsTest, InsertInNestedApplyOpsReturnsSuccess) {
- auto opCtx = cc().makeOperationContext();
- auto mode = OplogApplication::Mode::kApplyOpsCmd;
- // Make sure the apply ops command object contains the correct UUID information.
- CollectionOptions options;
- options.uuid = UUID::gen();
- BSONObjBuilder resultBuilder;
- NamespaceString nss("test", "foo");
- auto innerCmdObj = BSON("op"
- << "i"
- << "ns" << nss.ns() << "o"
- << BSON("_id"
- << "a")
- << "ui" << options.uuid.get());
- auto innerApplyOpsObj = BSON("op"
- << "c"
- << "ns" << nss.getCommandNS().ns() << "o"
- << BSON("applyOps" << BSON_ARRAY(innerCmdObj)));
- auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj));
-
- ASSERT_OK(_storage->createCollection(opCtx.get(), nss, options));
- ASSERT_OK(applyOps(opCtx.get(), nss.db().toString(), cmdObj, mode, &resultBuilder));
- ASSERT_BSONOBJ_EQ(BSON("applyOps" << BSON_ARRAY(innerCmdObj)), _opObserver->onApplyOpsCmdObj);
-}
-
-TEST_F(ApplyOpsTest, AtomicApplyOpsWithNoOpsReturnsSuccess) {
- auto opCtx = cc().makeOperationContext();
- auto mode = OplogApplication::Mode::kApplyOpsCmd;
- BSONObjBuilder resultBuilder;
- auto cmdObj = BSON("applyOps" << BSONArray());
- ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
- ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj);
-}
-
/**
* Creates an applyOps command object with a single insert operation.
*/
@@ -218,25 +184,6 @@ TEST_F(ApplyOpsTest,
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
}
-
-TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithUuid) {
- auto opCtx = cc().makeOperationContext();
- auto mode = OplogApplication::Mode::kApplyOpsCmd;
- NamespaceString nss("test.t");
-
- auto uuid = UUID::gen();
-
- CollectionOptions collectionOptions;
- collectionOptions.uuid = uuid;
- ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions));
-
- auto documentToInsert = BSON("_id" << 0);
- auto cmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert);
- BSONObjBuilder resultBuilder;
- ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
- ASSERT_BSONOBJ_EQ(cmdObj, _opObserver->onApplyOpsCmdObj);
-}
-
TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) {
auto opCtx = cc().makeOperationContext();
auto mode = OplogApplication::Mode::kApplyOpsCmd;
@@ -262,28 +209,6 @@ TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithOtherUuid) {
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
}
-TEST_F(ApplyOpsTest, AtomicApplyOpsInsertWithoutUuidIntoCollectionWithUuid) {
- auto opCtx = cc().makeOperationContext();
- auto mode = OplogApplication::Mode::kApplyOpsCmd;
- NamespaceString nss("test.t");
-
- auto uuid = UUID::gen();
-
- CollectionOptions collectionOptions;
- collectionOptions.uuid = uuid;
- ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions));
-
- auto documentToInsert = BSON("_id" << 0);
- auto cmdObj = makeApplyOpsWithInsertOperation(nss, boost::none, documentToInsert);
- BSONObjBuilder resultBuilder;
- ASSERT_OK(applyOps(opCtx.get(), "test", cmdObj, mode, &resultBuilder));
-
- // Insert operation provided by caller did not contain collection uuid but applyOps() should add
- // the uuid to the oplog entry.
- auto expectedCmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert);
- ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj);
-}
-
TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) {
auto opCtx = cc().makeOperationContext();