diff options
author | Judah Schvimer <judah@mongodb.com> | 2020-10-08 15:52:39 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-29 03:03:37 +0000 |
commit | 0ccc9275efc3e4c36850bd4cc297c90152b7a7e6 (patch) | |
tree | fb161c1b3289dd062c6b07434b1daab90af87bb8 /src/mongo/db | |
parent | b4cd12f6dddc0d84ca1396176b39260c1777fba8 (diff) | |
download | mongo-0ccc9275efc3e4c36850bd4cc297c90152b7a7e6.tar.gz |
SERVER-51246 Write a noop into the oplog buffer after each batch to ensure tenant applier reaches stop timestamp
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/oplog_batcher_test_fixture.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_batcher_test_fixture.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 116 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier_test.cpp | 125 |
16 files changed, 483 insertions, 62 deletions
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp index 8fef07cf73b..b85fccb8c39 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp @@ -190,6 +190,28 @@ OplogEntry makeInsertOplogEntry(int t, const NamespaceString& nss, boost::option boost::none); // _id } +OplogEntry makeNoopOplogEntry(int t, const StringData& msg) { + BSONObj oField = BSON("msg" << msg << "count" << t); + return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kNoop, // op type + NamespaceString(""), // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none); // _id +} + /** * Generates an applyOps oplog entry with the given number used for the timestamp. */ diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.h b/src/mongo/db/repl/oplog_batcher_test_fixture.h index 735e0c56195..3ee02439ef7 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.h +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.h @@ -82,6 +82,9 @@ private: OplogEntry makeInsertOplogEntry(int t, const NamespaceString& nss, boost::optional<UUID> uuid = boost::none); + +OplogEntry makeNoopOplogEntry(int t, const StringData& msg); + OplogEntry makeApplyOpsOplogEntry(int t, bool prepare, const std::vector<OplogEntry>& innerOps = {}); diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index 86f25deec44..a2916eee6e5 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -161,7 +161,9 @@ void OplogBufferCollection::push(OperationContext* opCtx, auto previousTimestamp = ts; std::tie(doc, ts) = addIdToDocument(value); invariant(!value.isEmpty()); - invariant(ts > previousTimestamp); + invariant(ts > previousTimestamp, + str::stream() << "ts: " << ts.toString() + << ", previous: " << previousTimestamp.toString()); return InsertStatement(doc); }); @@ -403,7 +405,7 @@ void OplogBufferCollection::_dropCollection(OperationContext* opCtx) { fassert(40155, _storageInterface->dropCollection(opCtx, _nss)); } -Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const { +Timestamp OplogBufferCollection::getLastPushedTimestamp() const { stdx::lock_guard<Latch> lk(_mutex); return _lastPushedTimestamp; } diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 2f50bfd288c..6c5597efe1c 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -123,9 +123,10 @@ public: const Timestamp& ts, SeekStrategy exact = SeekStrategy::kExact) final; + // Only currently used by the TenantMigrationRecipientService, so not part of a parent API. + Timestamp getLastPushedTimestamp() const; // ---- Testing API ---- - Timestamp getLastPushedTimestamp_forTest() const; Timestamp getLastPoppedTimestamp_forTest() const; std::queue<BSONObj> getPeekCache_forTest() const; diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index c609ba52849..6a5184ca117 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -246,7 +246,7 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); ASSERT_NOT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog); @@ -276,7 +276,7 @@ TEST_F(OplogBufferCollectionTest, StartupWithEmptyExistingCollectionInitializesC oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {}); @@ -390,7 +390,7 @@ TEST_F(OplogBufferCollectionTest, oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {}); } @@ -664,14 +664,14 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { oplogBuffer.startup(_opCtx.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); const std::vector<BSONObj> oplog = {makeOplogEntry(1)}; oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend()); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize())); - ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog}); @@ -680,7 +680,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { oplogBuffer.push(_opCtx.get(), oplog2.cbegin(), oplog2.cend()); ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize() + oplog2[0].objsize())); - ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]}); @@ -690,7 +690,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { ASSERT_BSONOBJ_EQ(oplog[0], poppedDoc); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog2[0].objsize())); - ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]}); @@ -699,7 +699,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); _assertDocumentsInCollectionEquals(_opCtx.get(), nss, {}); diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index e9763ae329f..a59da61b1ec 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -41,6 +41,7 @@ #include "mongo/db/stats/timer_stats.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" +#include "mongo/s/resharding/resume_token_gen.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" #include "mongo/util/time_support.h" @@ -188,6 +189,7 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, StartingPoint startingPoint, BSONObj filter, ReadConcernArgs readConcern, + bool requestResumeToken, StringData name) : AbstractAsyncComponent(executor, name.toString()), _source(source), @@ -204,7 +206,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, _batchSize(batchSize), _startingPoint(startingPoint), _queryFilter(filter), - _queryReadConcern(readConcern) { + _queryReadConcern(readConcern), + _requestResumeToken(requestResumeToken) { invariant(config.isInitialized()); invariant(!_lastFetched.isNull()); invariant(onShutdownCallbackFn); @@ -504,6 +507,10 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { filterBob.done(); queryBob.append("$maxTimeMS", findTimeout); + if (_requestResumeToken) { + queryBob.append("$hint", BSON("$natural" << 1)); + queryBob.append("$_requestResumeToken", true); + } auto lastCommittedWithCurrentTerm = _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); @@ -767,6 +774,13 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { oplogBatchStats.recordMillis(_lastBatchElapsedMS, documents.empty()); + if (_cursor->getPostBatchResumeToken()) { + auto pbrt = ResumeTokenOplogTimestamp::parse( + IDLParserErrorContext("OplogFetcher PostBatchResumeToken"), + *_cursor->getPostBatchResumeToken()); + info.resumeToken = pbrt.getTs(); + } + auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info); if (!status.isOK()) { return status; diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 459b2c42fe2..e7fbe964f5d 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -115,6 +115,7 @@ public: size_t toApplyDocumentCount = 0; size_t toApplyDocumentBytes = 0; OpTime lastDocument = OpTime(); + Timestamp resumeToken = Timestamp(); }; /** @@ -180,6 +181,7 @@ public: StartingPoint startingPoint = StartingPoint::kSkipFirstDoc, BSONObj filter = BSONObj(), ReadConcernArgs readConcern = ReadConcernArgs(), + bool requestResumeToken = false, StringData name = "oplog fetcher"_sd); virtual ~OplogFetcher(); @@ -446,6 +448,10 @@ private: // of "afterClusterTime: Timestamp(0,1)". ReadConcernArgs _queryReadConcern; + // Specifies if the oplog fetcher should request a resume token and provide it to + // _enqueueDocumentsFn. + const bool _requestResumeToken; + // Handle to currently scheduled _runQuery task. executor::TaskExecutor::CallbackHandle _runQueryHandle; @@ -470,6 +476,7 @@ public: OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, BSONObj filter = BSONObj(), ReadConcernArgs readConcern = ReadConcernArgs(), + bool requestResumeToken = false, StringData name = "oplog fetcher"_sd) const = 0; }; @@ -491,6 +498,7 @@ public: OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, BSONObj filter = BSONObj(), ReadConcernArgs readConcern = ReadConcernArgs(), + bool requestResumeToken = false, StringData name = "oplog_fetcher"_sd) const final { return std::make_unique<T>(executor, lastFetched, @@ -506,6 +514,7 @@ public: startingPoint, std::move(filter), std::move(readConcern), + requestResumeToken, name); } diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp index c3478ca914c..254eea0707e 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.cpp +++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp @@ -51,6 +51,7 @@ OplogFetcherMock::OplogFetcherMock( StartingPoint startingPoint, BSONObj filter, ReadConcernArgs readConcern, + bool requestResumeToken, StringData name) : OplogFetcher(executor, lastFetched, @@ -69,6 +70,7 @@ OplogFetcherMock::OplogFetcherMock( startingPoint, filter, readConcern, + requestResumeToken, name), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), _onShutdownCallbackFn(std::move(onShutdownCallbackFn)), @@ -84,7 +86,9 @@ OplogFetcherMock::~OplogFetcherMock() { } } -void OplogFetcherMock::receiveBatch(CursorId cursorId, OplogFetcher::Documents documents) { +void OplogFetcherMock::receiveBatch(CursorId cursorId, + OplogFetcher::Documents documents, + boost::optional<Timestamp> resumeToken) { { stdx::lock_guard<Latch> lock(_mutex); if (!_isActive_inlock()) { @@ -105,25 +109,28 @@ void OplogFetcherMock::receiveBatch(CursorId cursorId, OplogFetcher::Documents d return; } - if (!documents.empty()) { - auto info = validateResult.getValue(); - - // Enqueue documents in a separate thread with a different client than the test thread. This - // is to avoid interfering the thread local client in the test thread. - Status status = Status::OK(); - stdx::thread enqueueDocumentThread([&]() { - Client::initThread("enqueueDocumentThread"); - status = _enqueueDocumentsFn(documents.cbegin(), documents.cend(), info); - }); - // Wait until the enqueue finishes. - enqueueDocumentThread.join(); - - // Shutdown the OplogFetcher with error if enqueue fails. - if (!status.isOK()) { - shutdownWith(status); - return; - } + auto info = validateResult.getValue(); + if (resumeToken) { + info.resumeToken = *resumeToken; + } + + // Enqueue documents in a separate thread with a different client than the test thread. This + // is to avoid interfering the thread local client in the test thread. + Status status = Status::OK(); + stdx::thread enqueueDocumentThread([&]() { + Client::initThread("enqueueDocumentThread"); + status = _enqueueDocumentsFn(documents.cbegin(), documents.cend(), info); + }); + // Wait until the enqueue finishes. + enqueueDocumentThread.join(); + // Shutdown the OplogFetcher with error if enqueue fails. + if (!status.isOK()) { + shutdownWith(status); + return; + } + + if (!documents.empty()) { // Update lastFetched to the last oplog entry enqueued. auto lastDocRes = OpTime::parseFromOplogEntry(documents.back()); if (!lastDocRes.isOK()) { diff --git a/src/mongo/db/repl/oplog_fetcher_mock.h b/src/mongo/db/repl/oplog_fetcher_mock.h index 3e64acdc53e..65167b0a9eb 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.h +++ b/src/mongo/db/repl/oplog_fetcher_mock.h @@ -51,6 +51,7 @@ public: StartingPoint startingPoint = StartingPoint::kSkipFirstDoc, BSONObj filter = BSONObj(), ReadConcernArgs readConcern = ReadConcernArgs(), + bool requestResumeToken = false, StringData name = "oplog fetcher"_sd); virtual ~OplogFetcherMock(); @@ -60,7 +61,9 @@ public: * the enqueueDocumentsFn. * This is not thread-safe. */ - void receiveBatch(CursorId cursorId, OplogFetcher::Documents documents); + void receiveBatch(CursorId cursorId, + OplogFetcher::Documents documents, + boost::optional<Timestamp> resumeToken = boost::none); /** * Simulate an response error received by the OplogFetcher. diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 0bb0d41379b..7ec8d06f2d6 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -162,7 +162,8 @@ void validateFindCommand(Message m, ReadConcernArgs readConcern = ReadConcernArgs::fromBSONThrows( BSON("level" << "local" - << "afterClusterTime" << Timestamp(0, 1)))) { + << "afterClusterTime" << Timestamp(0, 1))), + bool requestResumeToken = false) { auto msg = mongo::OpMsg::parse(m); ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find"); ASSERT_TRUE(msg.body.getBoolField("tailable")); @@ -183,6 +184,8 @@ void validateFindCommand(Message m, // The find command should not specify the deprecated 'oplogReplay' flag. ASSERT_FALSE(msg.body["oplogReplay"]); + ASSERT_EQUALS(msg.body.hasField("$_requestResumeToken"), requestResumeToken); + validateMetadataRequest(msg); } @@ -319,7 +322,8 @@ protected: OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, int requiredRBID = ReplicationProcess::kUninitializedRollbackId, BSONObj filter = BSONObj(), - ReadConcernArgs readConcern = ReadConcernArgs()); + ReadConcernArgs readConcern = ReadConcernArgs(), + bool requestResumeToken = false); std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated( OplogFetcher::OnShutdownCallbackFn fn, int numRestarts = 0, @@ -327,7 +331,8 @@ protected: OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, int requiredRBID = ReplicationProcess::kUninitializedRollbackId, BSONObj filter = BSONObj(), - ReadConcernArgs args = ReadConcernArgs()); + ReadConcernArgs args = ReadConcernArgs(), + bool requestResumeToken = false); std::unique_ptr<ShutdownState> processSingleBatch( const Message& response, @@ -408,7 +413,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCr OplogFetcher::StartingPoint startingPoint, int requiredRBID, BSONObj filter, - ReadConcernArgs readConcern) { + ReadConcernArgs readConcern, + bool requestResumeToken) { auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(&getExecutor(), fn, numRestarts, @@ -416,7 +422,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCr startingPoint, requiredRBID, filter, - readConcern); + readConcern, + requestResumeToken); auto waitForConnCreatedFailPoint = globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled"); @@ -441,7 +448,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe OplogFetcher::StartingPoint startingPoint, int requiredRBID, BSONObj filter, - ReadConcernArgs readConcern) { + ReadConcernArgs readConcern, + bool requestResumeToken) { auto oplogFetcher = std::make_unique<OplogFetcher>( executor, lastFetched, @@ -456,7 +464,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe defaultBatchSize, startingPoint, filter, - readConcern); + readConcern, + requestResumeToken); oplogFetcher->setCreateClientFn_forTest([this]() { const auto autoReconnect = true; return std::unique_ptr<DBClientConnection>( @@ -2388,4 +2397,59 @@ TEST_F(OplogFetcherTest, CheckFindCommandIncludesCustomReadConcern) { oplogFetcher->join(); } +TEST_F(OplogFetcherTest, CheckFindCommandIncludesRequestResumeTokenWhenRequested) { + ShutdownState shutdownState; + + auto oplogFetcher = + getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), + 0 /* numRestarts */, + true /* requireFresherSyncSourc */, + OplogFetcher::StartingPoint::kSkipFirstDoc, + ReplicationProcess::kUninitializedRollbackId, + BSONObj() /* filter */, + ReadConcernArgs() /* readConcern */, + true /* requestResumeToken */); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto resumeToken = Timestamp(Seconds(567), 0); + auto resumeTokenObj = BSON("ts" << resumeToken); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); + auto firstBatch = {firstEntry, secondEntry}; + + // Update lastFetched before it is updated by getting the next batch. + lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); + + auto cursorRes = CursorResponse(NamespaceString::kRsOplogNamespace, + cursorId, + firstBatch, + boost::none, + boost::none, + resumeTokenObj); + + BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse)); + bob.appendElementsUnique(metadataObj); + auto batchResponse = OpMsg{bob.obj()}.serialize(); + + // Creating the cursor will succeed. + auto m = processSingleRequestResponse( + oplogFetcher->getDBClientConnection_forTest(), batchResponse, true); + + validateFindCommand( + m, + lastFetched, + durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()), + BSONObj() /* filter */, + ReadConcernArgs::fromBSONThrows(BSON("level" + << "local" + << "afterClusterTime" << Timestamp(0, 1))), + true /* requestResumeToken */); + + ASSERT_EQUALS(lastEnqueuedDocumentsInfo.resumeToken, resumeToken); + + oplogFetcher->shutdown(); + oplogFetcher->join(); +} + } // namespace diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index a10ab36b14c..6b2ad39343b 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -490,6 +490,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { OplogFetcher::StartingPoint::kEnqueueFirstDoc, _getOplogFetcherFilter(), ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern), + true /* requestResumeToken */, "TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString()); _donorOplogFetcher->setConnection(std::move(_oplogFetcherClient)); uassertStatusOK(_donorOplogFetcher->startup()); @@ -502,16 +503,43 @@ Status TenantMigrationRecipientService::Instance::_enqueueDocuments( invariant(_donorOplogBuffer); - if (info.toApplyDocumentCount == 0) - return Status::OK(); - auto opCtx = cc().makeOperationContext(); - // Wait for enough space. - _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes); + if (info.toApplyDocumentCount != 0) { + // Wait for enough space. + _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes); - // Buffer docs for later application. - _donorOplogBuffer->push(opCtx.get(), begin, end); + // Buffer docs for later application. + _donorOplogBuffer->push(opCtx.get(), begin, end); + } + if (info.resumeToken.isNull()) { + return Status(ErrorCodes::Error(5124600), "Resume token returned is null"); + } + const auto lastPushedTS = _donorOplogBuffer->getLastPushedTimestamp(); + if (lastPushedTS == info.resumeToken) { + // We don't want to insert a resume token noop if it would be a duplicate. + return Status::OK(); + } + invariant(lastPushedTS < info.resumeToken, + str::stream() << "LastPushed: " << lastPushedTS.toString() + << ", resumeToken: " << info.resumeToken.toString()); + + MutableOplogEntry noopEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setObject(BSON("msg" << TenantMigrationRecipientService::kNoopMsg << "tenantId" + << getTenantId() << "migrationId" << getMigrationUUID())); + noopEntry.setTimestamp(info.resumeToken); + // This term is not used for anything. + noopEntry.setTerm(OpTime::kUninitializedTerm); + + // Use an empty namespace string so this op is ignored by the applier. + noopEntry.setNss({}); + // Use an empty wall clock time since we have no wall clock time, but we must give it one, and + // we want it to be clearly fake. + noopEntry.setWallClockTime({}); + + OplogBuffer::Batch noopVec = {noopEntry.toBSON()}; + _donorOplogBuffer->push(opCtx.get(), noopVec.cbegin(), noopVec.cend()); return Status::OK(); } diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index e0b460479be..94376082f38 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -60,6 +60,7 @@ class TenantMigrationRecipientService final : public PrimaryOnlyService { public: static constexpr StringData kTenantMigrationRecipientServiceName = "TenantMigrationRecipientService"_sd; + static constexpr StringData kNoopMsg = "Resume token noop"_sd; explicit TenantMigrationRecipientService(ServiceContext* serviceContext); ~TenantMigrationRecipientService() = default; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index fa223e55366..85a3dcdaccf 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_buffer_collection.h" #include "mongo/db/repl/oplog_fetcher_mock.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/primary_only_service_op_observer.h" @@ -257,9 +258,14 @@ protected: return instance->_oplogFetcherClient.get(); } - OplogFetcher* getDonorOplogFetcher( + OplogFetcherMock* getDonorOplogFetcher( const TenantMigrationRecipientService::Instance* instance) const { - return instance->_donorOplogFetcher.get(); + return static_cast<OplogFetcherMock*>(instance->_donorOplogFetcher.get()); + } + + OplogBufferCollection* getDonorOplogBuffer( + const TenantMigrationRecipientService::Instance* instance) const { + return instance->_donorOplogBuffer.get(); } const TenantMigrationRecipientDocument& getStateDoc( @@ -270,6 +276,8 @@ protected: private: unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)}; + unittest::MinimumLoggedSeverityGuard _tenantMigrationSeverityGuard{ + logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)}; }; @@ -1024,5 +1032,109 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) { ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow()); } +TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTokenNoopsToBuffer) { + FailPointEnableBlock fp("fpAfterCollectionClonerDone", + BSON("action" + << "stop")); + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + + // Hang the recipient service after starting the oplog fetcher. + auto oplogFetcherFP = + globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); + auto initialTimesEntered = oplogFetcherFP->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + // Wait for the oplog fetcher to start. + oplogFetcherFP->waitForTimesEntered(initialTimesEntered + 1); + + // Feed the oplog fetcher a resume token. + auto oplogFetcher = getDonorOplogFetcher(instance.get()); + const auto resumeToken1 = topOfOplogOpTime.getTimestamp(); + auto oplogEntry1 = makeOplogEntry(topOfOplogOpTime, + OpTypeEnum::kInsert, + NamespaceString("foo.bar") /* namespace */, + UUID::gen() /* uuid */, + BSON("doc" << 2) /* o */, + boost::none /* o2 */); + oplogFetcher->receiveBatch(17, {oplogEntry1.toBSON()}, resumeToken1); + + const Timestamp oplogEntryTS2 = Timestamp(6, 2); + const Timestamp resumeToken2 = Timestamp(7, 3); + auto oplogEntry2 = makeOplogEntry(OpTime(oplogEntryTS2, topOfOplogOpTime.getTerm()), + OpTypeEnum::kInsert, + NamespaceString("foo.bar") /* namespace */, + UUID::gen() /* uuid */, + BSON("doc" << 3) /* o */, + boost::none /* o2 */); + oplogFetcher->receiveBatch(17, {oplogEntry2.toBSON()}, resumeToken2); + + // Receive an empty batch. + oplogFetcher->receiveBatch(17, {}, resumeToken2); + + auto oplogBuffer = getDonorOplogBuffer(instance.get()); + ASSERT_EQUALS(oplogBuffer->getCount(), 3); + + { + BSONObj insertDoc; + ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &insertDoc)); + LOGV2(5124601, "Insert oplog entry", "entry"_attr = insertDoc); + ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry1.toBSON()); + } + + { + BSONObj insertDoc; + ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &insertDoc)); + LOGV2(5124602, "Insert oplog entry", "entry"_attr = insertDoc); + ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry2.toBSON()); + } + + { + BSONObj noopDoc; + ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &noopDoc)); + LOGV2(5124603, "Noop oplog entry", "entry"_attr = noopDoc); + OplogEntry noopEntry(noopDoc); + ASSERT_TRUE(noopEntry.getOpType() == OpTypeEnum::kNoop); + ASSERT_EQUALS(noopEntry.getTimestamp(), resumeToken2); + ASSERT_EQUALS(noopEntry.getTerm().get(), -1); + ASSERT_EQUALS(noopEntry.getNss(), NamespaceString("")); + } + + ASSERT_TRUE(oplogBuffer->isEmpty()); + + // Let the recipient service complete. + oplogFetcherFP->setMode(FailPoint::off); + + // Wait for task completion success. + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 72c5294867f..157961b3749 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/insert_group.h" #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/logv2/log.h" #include "mongo/util/concurrency/thread_pool.h" @@ -224,8 +225,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { "Tenant Oplog Applier finished applying batch", "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, - "lastDonorOptime"_attr = lastBatchCompletedOpTimes.donorOpTime, - "lastRecipientOptime"_attr = lastBatchCompletedOpTimes.recipientOpTime); + "lastBatchCompletedOpTimes"_attr = lastBatchCompletedOpTimes); // Notify all the waiters on optimes before and including _lastBatchCompletedOpTimes. auto firstUnexpiredIter = @@ -285,6 +285,21 @@ void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx, } } +namespace { +bool isResumeTokenNoop(const OplogEntry& entry) { + if (entry.getOpType() != OpTypeEnum::kNoop) { + return false; + } + if (!entry.getObject().hasField("msg")) { + return false; + } + if (entry.getObject().getStringField("msg") != TenantMigrationRecipientService::kNoopMsg) { + return false; + } + return true; +} +} // namespace + TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( OperationContext* opCtx, const TenantOplogBatch& batch) { auto* opObserver = cc().getServiceContext()->getOpObserver(); @@ -292,8 +307,18 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( WriteUnitOfWork wuow(opCtx); // Reserve oplog slots for all entries. This allows us to write them in parallel. auto oplogSlots = repl::getNextOpTimes(opCtx, batch.ops.size()); + // Keep track of the greatest oplog slot actually used, ignoring resume token noops. This is + // what we want to return from this function. + auto greatestOplogSlotUsed = OpTime(); auto slotIter = oplogSlots.begin(); for (const auto& op : batch.ops) { + if (isResumeTokenNoop(op.entry)) { + // We do not want to set the recipient optime for resume token noop oplog entries since + // we won't actually apply them. + slotIter++; + continue; + } + greatestOplogSlotUsed = *slotIter; _setRecipientOpTime(op.entry.getOpTime(), *slotIter++); } const size_t numOplogThreads = _writerPool->getStats().numThreads; @@ -326,7 +351,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( } invariant(opsIter == batch.ops.end()); _writerPool->waitForIdle(); - return {batch.ops.back().entry.getOpTime(), oplogSlots.back()}; + return {batch.ops.back().entry.getOpTime(), greatestOplogSlotUsed}; } @@ -379,18 +404,24 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, WriteUnitOfWork wuow(opCtx.get()); auto slot = firstSlot; for (auto iter = begin; iter != end; iter++, slot++) { + const auto& entry = iter->entry; + if (isResumeTokenNoop(entry)) { + // We don't want to write noops for resume token noop oplog entries. They would + // not be applied in a change stream anyways. + continue; + } opObserver->onInternalOpMessage( opCtx.get(), - iter->entry.getNss(), - iter->entry.getUuid(), - iter->entry.toBSON(), + entry.getNss(), + entry.getUuid(), + entry.toBSON(), BSONObj(), // We link the no-ops together by recipient op time the same way the actual ops // were linked together by donor op time. This is to allow retryable writes // and changestreams to find the ops they need. - _maybeGetRecipientOpTime(iter->entry.getPreImageOpTime()), - _maybeGetRecipientOpTime(iter->entry.getPostImageOpTime()), - _maybeGetRecipientOpTime(iter->entry.getPrevWriteOpTimeInTransaction()), + _maybeGetRecipientOpTime(entry.getPreImageOpTime()), + _maybeGetRecipientOpTime(entry.getPostImageOpTime()), + _maybeGetRecipientOpTime(entry.getPrevWriteOpTimeInTransaction()), *slot); } }); diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 6cc1512f7d5..3512cac8a1d 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -62,6 +62,10 @@ public: return recipientOpTime < other.recipientOpTime; return donorOpTime < other.donorOpTime; } + std::string toString() const { + return BSON("donorOpTime" << donorOpTime << "recipientOpTime" << recipientOpTime) + .toString(); + } OpTime donorOpTime; OpTime recipientOpTime; }; @@ -82,11 +86,6 @@ public: */ SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime); - /** - * Returns the last donor and recipient optimes of the last batch applied. - */ - OpTimePair getLastBatchCompletedOpTimes(); - void setBatchLimits_forTest(TenantOplogBatcher::BatchLimits limits) { _limits = limits; } diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index c9c0843fc44..d995e3b86d6 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_oplog_applier.h" #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -184,6 +185,8 @@ protected: private: unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)}; + unittest::MinimumLoggedSeverityGuard _tenantMigrationSeverityGuard{ + logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)}; }; TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) { @@ -726,5 +729,127 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) { applier.join(); } +TEST_F(TenantOplogApplierTest, ApplyNoop_Success) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeNoopOplogEntry(1, "foo")); + pushOps(srcOps); + auto writerPool = makeTenantMigrationWriterPool(); + + TenantOplogApplier applier( + _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + ASSERT_OK(applier.startup()); + auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + auto futureRes = opAppliedFuture.getNoThrow(); + + auto entries = _opObserver->getEntries(); + ASSERT_EQ(1, entries.size()); + + ASSERT_OK(futureRes.getStatus()); + ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime()); + ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); + + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); + pushOps(srcOps); + auto writerPool = makeTenantMigrationWriterPool(); + + TenantOplogApplier applier( + _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + ASSERT_OK(applier.startup()); + auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime()); + auto futureRes = opAppliedFuture.getNoThrow(); + + auto entries = _opObserver->getEntries(); + ASSERT_EQ(0, entries.size()); + + ASSERT_OK(futureRes.getStatus()); + ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime()); + ASSERT_EQUALS(futureRes.getValue().recipientOpTime, OpTime()); + + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsert_Success) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "foo"), UUID::gen())); + pushOps(srcOps); + auto writerPool = makeTenantMigrationWriterPool(); + + TenantOplogApplier applier( + _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + ASSERT_OK(applier.startup()); + auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + auto futureRes = opAppliedFuture.getNoThrow(); + + auto entries = _opObserver->getEntries(); + ASSERT_EQ(1, entries.size()); + assertNoOpMatches(srcOps[1], entries[0]); + + ASSERT_OK(futureRes.getStatus()); + ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); + ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); + + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Success) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"), UUID::gen())); + srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg)); + pushOps(srcOps); + ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime()); + auto writerPool = makeTenantMigrationWriterPool(); + + TenantOplogApplier applier( + _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + ASSERT_OK(applier.startup()); + auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + auto futureRes = opAppliedFuture.getNoThrow(); + + auto entries = _opObserver->getEntries(); + ASSERT_EQ(1, entries.size()); + assertNoOpMatches(srcOps[0], entries[0]); + + ASSERT_OK(futureRes.getStatus()); + ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); + ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); + + applier.shutdown(); + applier.join(); +} + +TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) { + std::vector<OplogEntry> srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"), UUID::gen())); + srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg)); + pushOps(srcOps); + auto writerPool = makeTenantMigrationWriterPool(); + + TenantOplogApplier applier( + _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get()); + ASSERT_OK(applier.startup()); + auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime()); + auto futureRes = opAppliedFuture.getNoThrow(); + + auto entries = _opObserver->getEntries(); + ASSERT_EQ(1, entries.size()); + assertNoOpMatches(srcOps[0], entries[0]); + + ASSERT_OK(futureRes.getStatus()); + ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime()); + ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime()); + + applier.shutdown(); + applier.join(); +} + } // namespace repl } // namespace mongo |