diff options
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher_test.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 175 |
1 files changed, 97 insertions, 78 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 5209ea0c4e3..8f03c6c50cb 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -192,8 +192,7 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteComman std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj, bool requireFresherSyncSource) { - return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)}, - requireFresherSyncSource); + return processSingleBatch({obj, Milliseconds(0)}, requireFresherSyncSource); } void _checkDefaultCommandObjectFields(BSONObj cmdObj) { @@ -219,6 +218,12 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher(ReplSetConfig c defaultBatchSize); } +BSONObj concatenate(BSONObj a, const BSONObj& b) { + auto bob = BSONObjBuilder(std::move(a)); + bob.appendElements(b); + return bob.obj(); +} + TEST_F( OplogFetcherTest, FindQueryContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { @@ -255,19 +260,21 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProt } TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) { - auto shutdownState = processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)), - Milliseconds(0)}); + auto shutdownState = + processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + BSON(rpc::kReplSetMetadataFieldName + << BSON("invalid_repl_metadata_field" << 1))), + Milliseconds(0)}); ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); } TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) { - auto shutdownState = processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - BSON(rpc::kOplogQueryMetadataFieldName << BSON("invalid_oq_metadata_field" << 1)), - Milliseconds(0)}); + auto shutdownState = + processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + BSON(rpc::kOplogQueryMetadataFieldName + << BSON("invalid_oq_metadata_field" << 1))), + Milliseconds(0)}); ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); } @@ -279,10 +286,11 @@ TEST_F(OplogFetcherTest, ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_OK( + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); ASSERT_EQUALS(metadata.getPrimaryIndex(), dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex()); @@ -296,10 +304,11 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_OK( + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); ASSERT_EQUALS(replMetadata.getPrimaryIndex(), dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex()); @@ -315,11 +324,12 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) ASSERT_OK(oqMetadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, - processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::InvalidSyncSource, + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); ASSERT(lastEnqueuedDocuments.empty()); } @@ -332,11 +342,12 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) ASSERT_OK(oqMetadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, - processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::InvalidSyncSource, + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); ASSERT(lastEnqueuedDocuments.empty()); } @@ -349,11 +360,12 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead ASSERT_OK(oqMetadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, - processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::InvalidSyncSource, + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); ASSERT(lastEnqueuedDocuments.empty()); } @@ -370,7 +382,8 @@ TEST_F(OplogFetcherTest, auto entry = makeNoopOplogEntry({123LL, staleOpTime}); ASSERT_EQUALS( ErrorCodes::InvalidSyncSource, - processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false) + processSingleBatch( + {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false) ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); ASSERT(lastEnqueuedDocuments.empty()); @@ -388,8 +401,8 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM auto metadataObj = bob.obj(); auto entry = makeNoopOplogEntry(lastFetched); - auto shutdownState = - processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false); + auto shutdownState = processSingleBatch( + {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false); ASSERT_OK(shutdownState->getStatus()); ASSERT(dataReplicatorExternalState->metadataWasProcessed); } @@ -404,8 +417,8 @@ TEST_F(OplogFetcherTest, auto metadataObj = bob.obj(); auto entry = makeNoopOplogEntry(lastFetched); - auto shutdownState = - processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false); + auto shutdownState = processSingleBatch( + {concatenate(makeCursorResponse(0, {entry}), metadataObj), Milliseconds(0)}, false); ASSERT_OK(shutdownState->getStatus()); ASSERT(dataReplicatorExternalState->metadataWasProcessed); } @@ -418,8 +431,9 @@ TEST_F(OplogFetcherTest, auto metadataObj = bob.obj(); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), - metadataObj, + {concatenate(makeCursorResponse( + 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), + metadataObj), Milliseconds(0)}) ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); @@ -434,17 +448,17 @@ TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { auto metadataObj = bob.obj(); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), - metadataObj, + {concatenate(makeCursorResponse( + 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), + metadataObj), Milliseconds(0)}) ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) { - ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), - rpc::makeEmptyMetadata(), - Milliseconds(0)}) + ASSERT_OK(processSingleBatch( + {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), Milliseconds(0)}) ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } @@ -456,10 +470,10 @@ TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingEr TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS( - ErrorCodes::InvalidBSON, - processSingleBatch({makeCursorResponse(0, {BSONObj()}), metadataObj, Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS(ErrorCodes::InvalidBSON, + processSingleBatch({concatenate(makeCursorResponse(0, {BSONObj()}), metadataObj), + Milliseconds(0)}) + ->getStatus()); } TEST_F( @@ -468,8 +482,9 @@ TEST_F( auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), - metadataObj, + {concatenate(makeCursorResponse( + 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), + metadataObj), Milliseconds(0)}) ->getStatus()); } @@ -480,8 +495,9 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS( ErrorCodes::OplogStartMissing, processSingleBatch( - {makeCursorResponse(0, {makeNoopOplogEntry(remoteNewerOpTime, lastFetched.value + 1)}), - metadataObj, + {concatenate(makeCursorResponse( + 0, {makeNoopOplogEntry(remoteNewerOpTime, lastFetched.value + 1)}), + metadataObj), Milliseconds(0)}) ->getStatus()); } @@ -489,28 +505,30 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, - processSingleBatch( - {makeCursorResponse(0, - {makeNoopOplogEntry(lastFetched), - BSON("o" << BSON("msg" - << "oplog entry without optime"))}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::NoSuchKey, + processSingleBatch( + {concatenate(makeCursorResponse(0, + {makeNoopOplogEntry(lastFetched), + BSON("o" << BSON("msg" + << "oplog entry without optime"))}), + metadataObj), + Milliseconds(0)}) + ->getStatus()); } TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - processSingleBatch({makeCursorResponse(0, - {makeNoopOplogEntry(lastFetched), - makeNoopOplogEntry(Seconds(1000), 1), - makeNoopOplogEntry(Seconds(2000), 1), - makeNoopOplogEntry(Seconds(1500), 1)}), - metadataObj, - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::OplogOutOfOrder, + processSingleBatch({concatenate(makeCursorResponse(0, + {makeNoopOplogEntry(lastFetched), + makeNoopOplogEntry(Seconds(1000), 1), + makeNoopOplogEntry(Seconds(2000), 1), + makeNoopOplogEntry(Seconds(1500), 1)}), + metadataObj), + Milliseconds(0)}) + ->getStatus()); } TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { @@ -521,8 +539,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; - auto shutdownState = - processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)}); + auto shutdownState = processSingleBatch( + {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)}); ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); @@ -559,8 +577,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { return Status(ErrorCodes::InternalError, "my custom error"); }; - auto shutdownState = - processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)}); + auto shutdownState = processSingleBatch( + {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)}); ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error")); } @@ -582,8 +600,8 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata dataReplicatorExternalState->shouldStopFetchingResult = true; - auto shutdownState = - processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)}); + auto shutdownState = processSingleBatch( + {concatenate(makeCursorResponse(0, documents), metadataObj), Milliseconds(0)}); // Sync source checking happens after we have successfully pushed the operations into // the buffer for the next replication phase (eg. applier). @@ -697,7 +715,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); processNetworkResponse( - {makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj, Milliseconds(0)}, + {concatenate(makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj), + Milliseconds(0)}, true); ASSERT_EQUALS(1U, lastEnqueuedDocuments.size()); |