summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail_test.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp238
1 files changed, 233 insertions, 5 deletions
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index ac3b6e1ddd2..81e4cc51ac1 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -111,6 +111,7 @@ void SyncTailTest::setUp() {
Client::initThreadIfNotAlready();
_txn = cc().makeOperationContext();
+ _txn->lockState()->setIsBatchWriter(false);
_opsApplied = 0;
_applyOp = [](OperationContext* txn,
Database* db,
@@ -191,7 +192,8 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
bob.appendElements(opTime.toBSON());
bob.append("h", 1LL);
bob.append("op", "c");
- bob.append("ns", nss.ns());
+ bob.append("ns", nss.getCommandNS());
+ bob.append("o", BSON("create" << nss.coll()));
return OplogEntry(bob.obj());
}
@@ -551,8 +553,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
operationsApplied.push_back(operationsForWriterThreadToApply);
};
- auto op1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1);
- auto op2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2);
+ auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1));
+ auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2));
auto lastOpTime =
unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn));
@@ -580,6 +582,234 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
ASSERT_EQUALS(op2, operationsWritternToOplog[1]);
}
+TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ ASSERT_TRUE(_txn->writesAreReplicated());
+ ASSERT_FALSE(documentValidationDisabled(_txn.get()));
+ ASSERT_FALSE(_txn->lockState()->isBatchWriter());
+ auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
+ _txn.reset();
+ multiSyncApply({op}, nullptr);
+ // Collection should be created after SyncTail::syncApply() processes operation.
+ _txn = cc().makeOperationContext();
+ ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ ASSERT_TRUE(_txn->writesAreReplicated());
+ ASSERT_FALSE(documentValidationDisabled(_txn.get()));
+ ASSERT_FALSE(_txn->lockState()->isBatchWriter());
+ auto syncApply = [](OperationContext* txn, const BSONObj&, bool convertUpdatesToUpserts) {
+ ASSERT_FALSE(txn->writesAreReplicated());
+ ASSERT_TRUE(txn->lockState()->isBatchWriter());
+ ASSERT_TRUE(documentValidationDisabled(txn));
+ ASSERT_TRUE(convertUpdatesToUpserts);
+ return Status::OK();
+ };
+ auto op = makeUpdateDocumentOplogEntry(
+ {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
+ ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op}, syncApply));
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ OplogEntry op(BSON("op"
+ << "x"
+ << "ns"
+ << nss.ns()));
+ auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status {
+ return {ErrorCodes::OperationFailed, ""};
+ };
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply));
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) {
+ NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
+ OplogEntry op(BSON("op"
+ << "x"
+ << "ns"
+ << nss.ns()));
+ auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status {
+ uasserted(ErrorCodes::OperationFailed, "");
+ MONGO_UNREACHABLE;
+ };
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, multiSyncApply_noAbort(_txn.get(), {op}, syncApply));
+}
+
+TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) {
+ int x = 0;
+ auto makeOp = [&x](const char* ns) -> OplogEntry {
+ return OplogEntry(BSON("op"
+ << "x"
+ << "ns"
+ << ns
+ << "x"
+ << x++));
+ };
+ auto op1 = makeOp("test.t1");
+ auto op2 = makeOp("test.t1");
+ auto op3 = makeOp("test.t2");
+ auto op4 = makeOp("test.t3");
+ MultiApplier::Operations operationsApplied;
+ auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
+ operationsApplied.push_back(OplogEntry(op));
+ return Status::OK();
+ };
+ ASSERT_OK(multiSyncApply_noAbort(_txn.get(), {op4, op1, op3, op2}, syncApply));
+ ASSERT_EQUALS(4U, operationsApplied.size());
+ ASSERT_EQUALS(op1, operationsApplied[0]);
+ ASSERT_EQUALS(op2, operationsApplied[1]);
+ ASSERT_EQUALS(op3, operationsApplied[2]);
+ ASSERT_EQUALS(op4, operationsApplied[3]);
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplying) {
+ int seconds = 0;
+ auto makeOp = [&seconds](const NamespaceString& nss) {
+ return makeInsertDocumentOplogEntry(
+ {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++));
+ };
+ NamespaceString nss1("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1");
+ NamespaceString nss2("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_2");
+ auto createOp1 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss1);
+ auto createOp2 = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss2);
+ auto insertOp1a = makeOp(nss1);
+ auto insertOp1b = makeOp(nss1);
+ auto insertOp2a = makeOp(nss2);
+ auto insertOp2b = makeOp(nss2);
+ MultiApplier::Operations operationsApplied;
+ auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
+ operationsApplied.push_back(OplogEntry(op));
+ return Status::OK();
+ };
+
+ ASSERT_OK(multiSyncApply_noAbort(
+ _txn.get(),
+ {createOp1, createOp2, insertOp1a, insertOp2a, insertOp1b, insertOp2b},
+ syncApply));
+
+ ASSERT_EQUALS(4U, operationsApplied.size());
+ ASSERT_EQUALS(createOp1, operationsApplied[0]);
+ ASSERT_EQUALS(createOp2, operationsApplied[1]);
+
+ // Check grouped insert operations in namespace "nss1".
+ ASSERT_EQUALS(insertOp1a.getOpTime(), operationsApplied[2].getOpTime());
+ ASSERT_EQUALS(insertOp1a.ns, operationsApplied[2].ns);
+ ASSERT_EQUALS(BSONType::Array, operationsApplied[2].o.type());
+ auto group1 = operationsApplied[2].o.Array();
+ ASSERT_EQUALS(2U, group1.size());
+ ASSERT_EQUALS(insertOp1a.o.Obj(), group1[0].Obj());
+ ASSERT_EQUALS(insertOp1b.o.Obj(), group1[1].Obj());
+
+ // Check grouped insert operations in namespace "nss2".
+ ASSERT_EQUALS(insertOp2a.getOpTime(), operationsApplied[3].getOpTime());
+ ASSERT_EQUALS(insertOp2a.ns, operationsApplied[3].ns);
+ ASSERT_EQUALS(BSONType::Array, operationsApplied[3].o.type());
+ auto group2 = operationsApplied[3].o.Array();
+ ASSERT_EQUALS(2U, group2.size());
+ ASSERT_EQUALS(insertOp2a.o.Obj(), group2[0].Obj());
+ ASSERT_EQUALS(insertOp2b.o.Obj(), group2[1].Obj());
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) {
+ int seconds = 0;
+ auto makeOp = [&seconds](const NamespaceString& nss) {
+ return makeInsertDocumentOplogEntry(
+ {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++));
+ };
+ NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1");
+ auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss);
+
+ // Generate operations to apply:
+ // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)}
+ std::size_t limit = 64;
+ MultiApplier::Operations insertOps;
+ for (std::size_t i = 0; i < limit + 1; ++i) {
+ insertOps.push_back(makeOp(nss));
+ }
+ MultiApplier::Operations operationsToApply;
+ operationsToApply.push_back(createOp);
+ std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply));
+ MultiApplier::Operations operationsApplied;
+ auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
+ operationsApplied.push_back(OplogEntry(op));
+ return Status::OK();
+ };
+
+ ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply));
+
+ // multiSyncApply should combine operations as follows:
+ // {create}, {grouped_insert}, {insert_(limit+1)}
+ ASSERT_EQUALS(3U, operationsApplied.size());
+ ASSERT_EQUALS(createOp, operationsApplied[0]);
+
+ const auto& groupedInsertOp = operationsApplied[1];
+ ASSERT_EQUALS(insertOps.front().getOpTime(), groupedInsertOp.getOpTime());
+ ASSERT_EQUALS(insertOps.front().ns, groupedInsertOp.ns);
+ ASSERT_EQUALS(BSONType::Array, groupedInsertOp.o.type());
+ auto groupedInsertDocuments = groupedInsertOp.o.Array();
+ ASSERT_EQUALS(limit, groupedInsertDocuments.size());
+ for (std::size_t i = 0; i < limit; ++i) {
+ const auto& insertOp = insertOps[i];
+ ASSERT_EQUALS(insertOp.o.Obj(), groupedInsertDocuments[i].Obj());
+ }
+
+ // (limit + 1)-th insert operations should not be included in group of first (limit) inserts.
+ ASSERT_EQUALS(insertOps.back(), operationsApplied[2]);
+}
+
+TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) {
+ int seconds = 0;
+ auto makeOp = [&seconds](const NamespaceString& nss) {
+ return makeInsertDocumentOplogEntry(
+ {Timestamp(Seconds(seconds), 0), 1LL}, nss, BSON("_id" << seconds++));
+ };
+ NamespaceString nss("test." + _agent.getSuiteName() + "_" + _agent.getTestName() + "_1");
+ auto createOp = makeCreateCollectionOplogEntry({Timestamp(Seconds(seconds++), 0), 1LL}, nss);
+
+ // Generate operations to apply:
+ // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)}
+ std::size_t limit = 64;
+ MultiApplier::Operations insertOps;
+ for (std::size_t i = 0; i < limit + 1; ++i) {
+ insertOps.push_back(makeOp(nss));
+ }
+ MultiApplier::Operations operationsToApply;
+ operationsToApply.push_back(createOp);
+ std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply));
+
+ std::size_t numFailedGroupedInserts = 0;
+ MultiApplier::Operations operationsApplied;
+ auto syncApply = [&numFailedGroupedInserts,
+ &operationsApplied](OperationContext*, const BSONObj& op, bool) -> Status {
+ // Reject grouped insert operations.
+ if (op["o"].type() == BSONType::Array) {
+ numFailedGroupedInserts++;
+ return {ErrorCodes::OperationFailed, "grouped inserts not supported"};
+ }
+ operationsApplied.push_back(OplogEntry(op));
+ return Status::OK();
+ };
+
+ ASSERT_OK(multiSyncApply_noAbort(_txn.get(), operationsToApply, syncApply));
+
+ // On failing to apply the grouped insert operation, multiSyncApply should apply the operations
+ // as given in "operationsToApply":
+ // {create}, {insert_1}, {insert_2}, .. {insert_(limit)}, {insert_(limit+1)}
+ ASSERT_EQUALS(limit + 2, operationsApplied.size());
+ ASSERT_EQUALS(createOp, operationsApplied[0]);
+
+ for (std::size_t i = 0; i < limit + 1; ++i) {
+ const auto& insertOp = insertOps[i];
+ ASSERT_EQUALS(insertOp, operationsApplied[i + 1]);
+ }
+
+ // Ensure that multiSyncApply does not attempt to group remaining operations in first failed
+ // grouped insert operation.
+ ASSERT_EQUALS(1U, numFailedGroupedInserts);
+}
+
TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
SyncTailWithOperationContextChecker syncTail;
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
@@ -625,7 +855,6 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyRetriesFailedUpdateIfDocumentIsAvailab
TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailingToRetryBadOp) {
SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1));
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- auto updatedDocument = BSON("_id" << 0 << "x" << 2);
OplogEntry op(BSON("op"
<< "x"
<< "ns"
@@ -636,7 +865,6 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailin
TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughShouldSyncTailRetryError) {
SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr);
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- auto updatedDocument = BSON("_id" << 0 << "x" << 2);
auto op = makeUpdateDocumentOplogEntry(
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
ASSERT_THROWS_CODE(