summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher_test.cpp')
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp175
1 files changed, 97 insertions, 78 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 5209ea0c4e3..8f03c6c50cb 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -192,8 +192,7 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteComman
std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj,
bool requireFresherSyncSource) {
- return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)},
- requireFresherSyncSource);
+ return processSingleBatch({obj, Milliseconds(0)}, requireFresherSyncSource);
}
void _checkDefaultCommandObjectFields(BSONObj cmdObj) {
@@ -219,6 +218,12 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher(ReplSetConfig c
defaultBatchSize);
}
+BSONObj concatenate(BSONObj a, const BSONObj& b) {
+ auto bob = BSONObjBuilder(std::move(a));
+ bob.appendElements(b);
+ return bob.obj();
+}
+
TEST_F(
OplogFetcherTest,
FindQueryContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) {
@@ -255,19 +260,21 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProt
}
TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) {
- auto shutdownState = processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)),
- Milliseconds(0)});
+ auto shutdownState =
+ processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ BSON(rpc::kReplSetMetadataFieldName
+ << BSON("invalid_repl_metadata_field" << 1))),
+ Milliseconds(0)});
ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
}
TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) {
- auto shutdownState = processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- BSON(rpc::kOplogQueryMetadataFieldName << BSON("invalid_oq_metadata_field" << 1)),
- Milliseconds(0)});
+ auto shutdownState =
+ processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ BSON(rpc::kOplogQueryMetadataFieldName
+ << BSON("invalid_oq_metadata_field" << 1))),
+ Milliseconds(0)});
ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus());
}
@@ -279,10 +286,11 @@ TEST_F(OplogFetcherTest,
ASSERT_OK(metadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
- ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_OK(
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT_EQUALS(metadata.getPrimaryIndex(),
dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex());
@@ -296,10 +304,11 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
- ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_OK(
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT_EQUALS(replMetadata.getPrimaryIndex(),
dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex());
@@ -315,11 +324,12 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack)
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
- ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
- processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidSyncSource,
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT(lastEnqueuedDocuments.empty());
}
@@ -332,11 +342,12 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind)
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
- ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
- processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidSyncSource,
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT(lastEnqueuedDocuments.empty());
}
@@ -349,11 +360,12 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
- ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
- processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidSyncSource,
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT(lastEnqueuedDocuments.empty());
}
@@ -370,7 +382,8 @@ TEST_F(OplogFetcherTest,
auto entry = makeNoopOplogEntry({123LL, staleOpTime});
ASSERT_EQUALS(
ErrorCodes::InvalidSyncSource,
- processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false)
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false)
->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT(lastEnqueuedDocuments.empty());
@@ -388,8 +401,8 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM
auto metadataObj = bob.obj();
auto entry = makeNoopOplogEntry(lastFetched);
- auto shutdownState =
- processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false);
+ auto shutdownState = processSingleBatch(
+ {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false);
ASSERT_OK(shutdownState->getStatus());
ASSERT(dataReplicatorExternalState->metadataWasProcessed);
}
@@ -404,8 +417,8 @@ TEST_F(OplogFetcherTest,
auto metadataObj = bob.obj();
auto entry = makeNoopOplogEntry(lastFetched);
- auto shutdownState =
- processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false);
+ auto shutdownState = processSingleBatch(
+ {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false);
ASSERT_OK(shutdownState->getStatus());
ASSERT(dataReplicatorExternalState->metadataWasProcessed);
}
@@ -418,8 +431,9 @@ TEST_F(OplogFetcherTest,
auto metadataObj = bob.obj();
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
- metadataObj,
+ {concatenate(makeCursorResponse(
+ 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
+ metadataObj),
Milliseconds(0)})
->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
@@ -434,17 +448,17 @@ TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
auto metadataObj = bob.obj();
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
- metadataObj,
+ {concatenate(makeCursorResponse(
+ 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
+ metadataObj),
Milliseconds(0)})
->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) {
- ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
- rpc::makeEmptyMetadata(),
- Milliseconds(0)})
+ ASSERT_OK(processSingleBatch(
+ {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), Milliseconds(0)})
->getStatus());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
@@ -456,10 +470,10 @@ TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingEr
TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(
- ErrorCodes::InvalidBSON,
- processSingleBatch({makeCursorResponse(0, {BSONObj()}), metadataObj, Milliseconds(0)})
- ->getStatus());
+ ASSERT_EQUALS(ErrorCodes::InvalidBSON,
+ processSingleBatch({concatenate(makeCursorResponse(0, {BSONObj()}), metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
}
TEST_F(
@@ -468,8 +482,9 @@ TEST_F(
auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
- metadataObj,
+ {concatenate(makeCursorResponse(
+ 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
+ metadataObj),
Milliseconds(0)})
->getStatus());
}
@@ -480,8 +495,9 @@ TEST_F(OplogFetcherTest,
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
processSingleBatch(
- {makeCursorResponse(0, {makeNoopOplogEntry(remoteNewerOpTime, lastFetched.value + 1)}),
- metadataObj,
+ {concatenate(makeCursorResponse(
+ 0, {makeNoopOplogEntry(remoteNewerOpTime, lastFetched.value + 1)}),
+ metadataObj),
Milliseconds(0)})
->getStatus());
}
@@ -489,28 +505,30 @@ TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest,
MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(ErrorCodes::NoSuchKey,
- processSingleBatch(
- {makeCursorResponse(0,
- {makeNoopOplogEntry(lastFetched),
- BSON("o" << BSON("msg"
- << "oplog entry without optime"))}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_EQUALS(
+ ErrorCodes::NoSuchKey,
+ processSingleBatch(
+ {concatenate(makeCursorResponse(0,
+ {makeNoopOplogEntry(lastFetched),
+ BSON("o" << BSON("msg"
+ << "oplog entry without optime"))}),
+ metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
}
TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
- ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- processSingleBatch({makeCursorResponse(0,
- {makeNoopOplogEntry(lastFetched),
- makeNoopOplogEntry(Seconds(1000), 1),
- makeNoopOplogEntry(Seconds(2000), 1),
- makeNoopOplogEntry(Seconds(1500), 1)}),
- metadataObj,
- Milliseconds(0)})
- ->getStatus());
+ ASSERT_EQUALS(
+ ErrorCodes::OplogOutOfOrder,
+ processSingleBatch({concatenate(makeCursorResponse(0,
+ {makeNoopOplogEntry(lastFetched),
+ makeNoopOplogEntry(Seconds(1000), 1),
+ makeNoopOplogEntry(Seconds(2000), 1),
+ makeNoopOplogEntry(Seconds(1500), 1)}),
+ metadataObj),
+ Milliseconds(0)})
+ ->getStatus());
}
TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
@@ -521,8 +539,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
- auto shutdownState =
- processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
+ auto shutdownState = processSingleBatch(
+ {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)});
ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
@@ -559,8 +577,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
return Status(ErrorCodes::InternalError, "my custom error");
};
- auto shutdownState =
- processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
+ auto shutdownState = processSingleBatch(
+ {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)});
ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error"));
}
@@ -582,8 +600,8 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata
dataReplicatorExternalState->shouldStopFetchingResult = true;
- auto shutdownState =
- processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
+ auto shutdownState = processSingleBatch(
+ {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)});
// Sync source checking happens after we have successfully pushed the operations into
// the buffer for the next replication phase (eg. applier).
@@ -697,7 +715,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() {
auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
processNetworkResponse(
- {makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj, Milliseconds(0)},
+ {concatenate(makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj),
+ Milliseconds(0)},
true);
ASSERT_EQUALS(1U, lastEnqueuedDocuments.size());