summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2020-10-08 15:52:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-29 03:03:37 +0000
commit0ccc9275efc3e4c36850bd4cc297c90152b7a7e6 (patch)
treefb161c1b3289dd062c6b07434b1daab90af87bb8 /src/mongo/db
parentb4cd12f6dddc0d84ca1396176b39260c1777fba8 (diff)
downloadmongo-0ccc9275efc3e4c36850bd4cc297c90152b7a7e6.tar.gz
SERVER-51246 Write a noop into the oplog buffer after each batch to ensure tenant applier reaches stop timestamp
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.cpp22
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.h3
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp6
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h3
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp16
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp16
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h9
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.cpp45
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.h5
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp78
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp42
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h1
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp116
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp49
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h9
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp125
16 files changed, 483 insertions, 62 deletions
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
index 8fef07cf73b..b85fccb8c39 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
@@ -190,6 +190,28 @@ OplogEntry makeInsertOplogEntry(int t, const NamespaceString& nss, boost::option
boost::none); // _id
}
+OplogEntry makeNoopOplogEntry(int t, const StringData& msg) {
+ BSONObj oField = BSON("msg" << msg << "count" << t);
+ return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime
+ boost::none, // hash
+ OpTypeEnum::kNoop, // op type
+ NamespaceString(""), // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ OplogEntry::kOplogVersion, // version
+ oField, // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ Date_t() + Seconds(t), // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ boost::none, // ShardId of resharding recipient
+ boost::none); // _id
+}
+
/**
* Generates an applyOps oplog entry with the given number used for the timestamp.
*/
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.h b/src/mongo/db/repl/oplog_batcher_test_fixture.h
index 735e0c56195..3ee02439ef7 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.h
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.h
@@ -82,6 +82,9 @@ private:
OplogEntry makeInsertOplogEntry(int t,
const NamespaceString& nss,
boost::optional<UUID> uuid = boost::none);
+
+OplogEntry makeNoopOplogEntry(int t, const StringData& msg);
+
OplogEntry makeApplyOpsOplogEntry(int t,
bool prepare,
const std::vector<OplogEntry>& innerOps = {});
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index 86f25deec44..a2916eee6e5 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -161,7 +161,9 @@ void OplogBufferCollection::push(OperationContext* opCtx,
auto previousTimestamp = ts;
std::tie(doc, ts) = addIdToDocument(value);
invariant(!value.isEmpty());
- invariant(ts > previousTimestamp);
+ invariant(ts > previousTimestamp,
+ str::stream() << "ts: " << ts.toString()
+ << ", previous: " << previousTimestamp.toString());
return InsertStatement(doc);
});
@@ -403,7 +405,7 @@ void OplogBufferCollection::_dropCollection(OperationContext* opCtx) {
fassert(40155, _storageInterface->dropCollection(opCtx, _nss));
}
-Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const {
+Timestamp OplogBufferCollection::getLastPushedTimestamp() const {
stdx::lock_guard<Latch> lk(_mutex);
return _lastPushedTimestamp;
}
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 2f50bfd288c..6c5597efe1c 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -123,9 +123,10 @@ public:
const Timestamp& ts,
SeekStrategy exact = SeekStrategy::kExact) final;
+ // Only currently used by the TenantMigrationRecipientService, so not part of a parent API.
+ Timestamp getLastPushedTimestamp() const;
// ---- Testing API ----
- Timestamp getLastPushedTimestamp_forTest() const;
Timestamp getLastPoppedTimestamp_forTest() const;
std::queue<BSONObj> getPeekCache_forTest() const;
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index c609ba52849..6a5184ca117 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -246,7 +246,7 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_NOT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(1, 1), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
@@ -276,7 +276,7 @@ TEST_F(OplogBufferCollectionTest, StartupWithEmptyExistingCollectionInitializesC
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {});
@@ -390,7 +390,7 @@ TEST_F(OplogBufferCollectionTest,
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(0, 0), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {});
}
@@ -664,14 +664,14 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
oplogBuffer.startup(_opCtx.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
const std::vector<BSONObj> oplog = {makeOplogEntry(1)};
oplogBuffer.push(_opCtx.get(), oplog.cbegin(), oplog.cend());
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize()));
- ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog});
@@ -680,7 +680,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
oplogBuffer.push(_opCtx.get(), oplog2.cbegin(), oplog2.cend());
ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog[0].objsize() + oplog2[0].objsize()));
- ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]});
@@ -690,7 +690,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
ASSERT_BSONOBJ_EQ(oplog[0], poppedDoc);
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog2[0].objsize()));
- ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(oplog2[0]["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(oplog[0]["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {oplog[0], oplog2[0]});
@@ -699,7 +699,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, {});
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index e9763ae329f..a59da61b1ec 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/stats/timer_stats.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
+#include "mongo/s/resharding/resume_token_gen.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/time_support.h"
@@ -188,6 +189,7 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
StartingPoint startingPoint,
BSONObj filter,
ReadConcernArgs readConcern,
+ bool requestResumeToken,
StringData name)
: AbstractAsyncComponent(executor, name.toString()),
_source(source),
@@ -204,7 +206,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_batchSize(batchSize),
_startingPoint(startingPoint),
_queryFilter(filter),
- _queryReadConcern(readConcern) {
+ _queryReadConcern(readConcern),
+ _requestResumeToken(requestResumeToken) {
invariant(config.isInitialized());
invariant(!_lastFetched.isNull());
invariant(onShutdownCallbackFn);
@@ -504,6 +507,10 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
filterBob.done();
queryBob.append("$maxTimeMS", findTimeout);
+ if (_requestResumeToken) {
+ queryBob.append("$hint", BSON("$natural" << 1));
+ queryBob.append("$_requestResumeToken", true);
+ }
auto lastCommittedWithCurrentTerm =
_dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
@@ -767,6 +774,13 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
oplogBatchStats.recordMillis(_lastBatchElapsedMS, documents.empty());
+ if (_cursor->getPostBatchResumeToken()) {
+ auto pbrt = ResumeTokenOplogTimestamp::parse(
+ IDLParserErrorContext("OplogFetcher PostBatchResumeToken"),
+ *_cursor->getPostBatchResumeToken());
+ info.resumeToken = pbrt.getTs();
+ }
+
auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info);
if (!status.isOK()) {
return status;
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 459b2c42fe2..e7fbe964f5d 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -115,6 +115,7 @@ public:
size_t toApplyDocumentCount = 0;
size_t toApplyDocumentBytes = 0;
OpTime lastDocument = OpTime();
+ Timestamp resumeToken = Timestamp();
};
/**
@@ -180,6 +181,7 @@ public:
StartingPoint startingPoint = StartingPoint::kSkipFirstDoc,
BSONObj filter = BSONObj(),
ReadConcernArgs readConcern = ReadConcernArgs(),
+ bool requestResumeToken = false,
StringData name = "oplog fetcher"_sd);
virtual ~OplogFetcher();
@@ -446,6 +448,10 @@ private:
// of "afterClusterTime: Timestamp(0,1)".
ReadConcernArgs _queryReadConcern;
+ // Specifies if the oplog fetcher should request a resume token and provide it to
+ // _enqueueDocumentsFn.
+ const bool _requestResumeToken;
+
// Handle to currently scheduled _runQuery task.
executor::TaskExecutor::CallbackHandle _runQueryHandle;
@@ -470,6 +476,7 @@ public:
OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
BSONObj filter = BSONObj(),
ReadConcernArgs readConcern = ReadConcernArgs(),
+ bool requestResumeToken = false,
StringData name = "oplog fetcher"_sd) const = 0;
};
@@ -491,6 +498,7 @@ public:
OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
BSONObj filter = BSONObj(),
ReadConcernArgs readConcern = ReadConcernArgs(),
+ bool requestResumeToken = false,
StringData name = "oplog_fetcher"_sd) const final {
return std::make_unique<T>(executor,
lastFetched,
@@ -506,6 +514,7 @@ public:
startingPoint,
std::move(filter),
std::move(readConcern),
+ requestResumeToken,
name);
}
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp
index c3478ca914c..254eea0707e 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp
@@ -51,6 +51,7 @@ OplogFetcherMock::OplogFetcherMock(
StartingPoint startingPoint,
BSONObj filter,
ReadConcernArgs readConcern,
+ bool requestResumeToken,
StringData name)
: OplogFetcher(executor,
lastFetched,
@@ -69,6 +70,7 @@ OplogFetcherMock::OplogFetcherMock(
startingPoint,
filter,
readConcern,
+ requestResumeToken,
name),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
_onShutdownCallbackFn(std::move(onShutdownCallbackFn)),
@@ -84,7 +86,9 @@ OplogFetcherMock::~OplogFetcherMock() {
}
}
-void OplogFetcherMock::receiveBatch(CursorId cursorId, OplogFetcher::Documents documents) {
+void OplogFetcherMock::receiveBatch(CursorId cursorId,
+ OplogFetcher::Documents documents,
+ boost::optional<Timestamp> resumeToken) {
{
stdx::lock_guard<Latch> lock(_mutex);
if (!_isActive_inlock()) {
@@ -105,25 +109,28 @@ void OplogFetcherMock::receiveBatch(CursorId cursorId, OplogFetcher::Documents d
return;
}
- if (!documents.empty()) {
- auto info = validateResult.getValue();
-
- // Enqueue documents in a separate thread with a different client than the test thread. This
- // is to avoid interfering the thread local client in the test thread.
- Status status = Status::OK();
- stdx::thread enqueueDocumentThread([&]() {
- Client::initThread("enqueueDocumentThread");
- status = _enqueueDocumentsFn(documents.cbegin(), documents.cend(), info);
- });
- // Wait until the enqueue finishes.
- enqueueDocumentThread.join();
-
- // Shutdown the OplogFetcher with error if enqueue fails.
- if (!status.isOK()) {
- shutdownWith(status);
- return;
- }
+ auto info = validateResult.getValue();
+ if (resumeToken) {
+ info.resumeToken = *resumeToken;
+ }
+
+ // Enqueue documents in a separate thread with a different client than the test thread. This
+ // is to avoid interfering the thread local client in the test thread.
+ Status status = Status::OK();
+ stdx::thread enqueueDocumentThread([&]() {
+ Client::initThread("enqueueDocumentThread");
+ status = _enqueueDocumentsFn(documents.cbegin(), documents.cend(), info);
+ });
+ // Wait until the enqueue finishes.
+ enqueueDocumentThread.join();
+ // Shutdown the OplogFetcher with error if enqueue fails.
+ if (!status.isOK()) {
+ shutdownWith(status);
+ return;
+ }
+
+ if (!documents.empty()) {
// Update lastFetched to the last oplog entry enqueued.
auto lastDocRes = OpTime::parseFromOplogEntry(documents.back());
if (!lastDocRes.isOK()) {
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.h b/src/mongo/db/repl/oplog_fetcher_mock.h
index 3e64acdc53e..65167b0a9eb 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.h
+++ b/src/mongo/db/repl/oplog_fetcher_mock.h
@@ -51,6 +51,7 @@ public:
StartingPoint startingPoint = StartingPoint::kSkipFirstDoc,
BSONObj filter = BSONObj(),
ReadConcernArgs readConcern = ReadConcernArgs(),
+ bool requestResumeToken = false,
StringData name = "oplog fetcher"_sd);
virtual ~OplogFetcherMock();
@@ -60,7 +61,9 @@ public:
* the enqueueDocumentsFn.
* This is not thread-safe.
*/
- void receiveBatch(CursorId cursorId, OplogFetcher::Documents documents);
+ void receiveBatch(CursorId cursorId,
+ OplogFetcher::Documents documents,
+ boost::optional<Timestamp> resumeToken = boost::none);
/**
* Simulate an response error received by the OplogFetcher.
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 0bb0d41379b..7ec8d06f2d6 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -162,7 +162,8 @@ void validateFindCommand(Message m,
ReadConcernArgs readConcern = ReadConcernArgs::fromBSONThrows(
BSON("level"
<< "local"
- << "afterClusterTime" << Timestamp(0, 1)))) {
+ << "afterClusterTime" << Timestamp(0, 1))),
+ bool requestResumeToken = false) {
auto msg = mongo::OpMsg::parse(m);
ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find");
ASSERT_TRUE(msg.body.getBoolField("tailable"));
@@ -183,6 +184,8 @@ void validateFindCommand(Message m,
// The find command should not specify the deprecated 'oplogReplay' flag.
ASSERT_FALSE(msg.body["oplogReplay"]);
+ ASSERT_EQUALS(msg.body.hasField("$_requestResumeToken"), requestResumeToken);
+
validateMetadataRequest(msg);
}
@@ -319,7 +322,8 @@ protected:
OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
int requiredRBID = ReplicationProcess::kUninitializedRollbackId,
BSONObj filter = BSONObj(),
- ReadConcernArgs readConcern = ReadConcernArgs());
+ ReadConcernArgs readConcern = ReadConcernArgs(),
+ bool requestResumeToken = false);
std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated(
OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts = 0,
@@ -327,7 +331,8 @@ protected:
OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
int requiredRBID = ReplicationProcess::kUninitializedRollbackId,
BSONObj filter = BSONObj(),
- ReadConcernArgs args = ReadConcernArgs());
+ ReadConcernArgs args = ReadConcernArgs(),
+ bool requestResumeToken = false);
std::unique_ptr<ShutdownState> processSingleBatch(
const Message& response,
@@ -408,7 +413,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCr
OplogFetcher::StartingPoint startingPoint,
int requiredRBID,
BSONObj filter,
- ReadConcernArgs readConcern) {
+ ReadConcernArgs readConcern,
+ bool requestResumeToken) {
auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(&getExecutor(),
fn,
numRestarts,
@@ -416,7 +422,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCr
startingPoint,
requiredRBID,
filter,
- readConcern);
+ readConcern,
+ requestResumeToken);
auto waitForConnCreatedFailPoint =
globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled");
@@ -441,7 +448,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
OplogFetcher::StartingPoint startingPoint,
int requiredRBID,
BSONObj filter,
- ReadConcernArgs readConcern) {
+ ReadConcernArgs readConcern,
+ bool requestResumeToken) {
auto oplogFetcher = std::make_unique<OplogFetcher>(
executor,
lastFetched,
@@ -456,7 +464,8 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
defaultBatchSize,
startingPoint,
filter,
- readConcern);
+ readConcern,
+ requestResumeToken);
oplogFetcher->setCreateClientFn_forTest([this]() {
const auto autoReconnect = true;
return std::unique_ptr<DBClientConnection>(
@@ -2388,4 +2397,59 @@ TEST_F(OplogFetcherTest, CheckFindCommandIncludesCustomReadConcern) {
oplogFetcher->join();
}
+TEST_F(OplogFetcherTest, CheckFindCommandIncludesRequestResumeTokenWhenRequested) {
+ ShutdownState shutdownState;
+
+ auto oplogFetcher =
+ getOplogFetcherAfterConnectionCreated(std::ref(shutdownState),
+ 0 /* numRestarts */,
+ true /* requireFresherSyncSourc */,
+ OplogFetcher::StartingPoint::kSkipFirstDoc,
+ ReplicationProcess::kUninitializedRollbackId,
+ BSONObj() /* filter */,
+ ReadConcernArgs() /* readConcern */,
+ true /* requestResumeToken */);
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
+ auto resumeToken = Timestamp(Seconds(567), 0);
+ auto resumeTokenObj = BSON("ts" << resumeToken);
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
+ auto firstBatch = {firstEntry, secondEntry};
+
+ // Update lastFetched before it is updated by getting the next batch.
+ lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
+
+ auto cursorRes = CursorResponse(NamespaceString::kRsOplogNamespace,
+ cursorId,
+ firstBatch,
+ boost::none,
+ boost::none,
+ resumeTokenObj);
+
+ BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse));
+ bob.appendElementsUnique(metadataObj);
+ auto batchResponse = OpMsg{bob.obj()}.serialize();
+
+ // Creating the cursor will succeed.
+ auto m = processSingleRequestResponse(
+ oplogFetcher->getDBClientConnection_forTest(), batchResponse, true);
+
+ validateFindCommand(
+ m,
+ lastFetched,
+ durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()),
+ BSONObj() /* filter */,
+ ReadConcernArgs::fromBSONThrows(BSON("level"
+ << "local"
+ << "afterClusterTime" << Timestamp(0, 1))),
+ true /* requestResumeToken */);
+
+ ASSERT_EQUALS(lastEnqueuedDocumentsInfo.resumeToken, resumeToken);
+
+ oplogFetcher->shutdown();
+ oplogFetcher->join();
+}
+
} // namespace
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index a10ab36b14c..6b2ad39343b 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -490,6 +490,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
OplogFetcher::StartingPoint::kEnqueueFirstDoc,
_getOplogFetcherFilter(),
ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern),
+ true /* requestResumeToken */,
"TenantOplogFetcher_" + getTenantId() + "_" + getMigrationUUID().toString());
_donorOplogFetcher->setConnection(std::move(_oplogFetcherClient));
uassertStatusOK(_donorOplogFetcher->startup());
@@ -502,16 +503,43 @@ Status TenantMigrationRecipientService::Instance::_enqueueDocuments(
invariant(_donorOplogBuffer);
- if (info.toApplyDocumentCount == 0)
- return Status::OK();
-
auto opCtx = cc().makeOperationContext();
- // Wait for enough space.
- _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes);
+ if (info.toApplyDocumentCount != 0) {
+ // Wait for enough space.
+ _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes);
- // Buffer docs for later application.
- _donorOplogBuffer->push(opCtx.get(), begin, end);
+ // Buffer docs for later application.
+ _donorOplogBuffer->push(opCtx.get(), begin, end);
+ }
+ if (info.resumeToken.isNull()) {
+ return Status(ErrorCodes::Error(5124600), "Resume token returned is null");
+ }
+ const auto lastPushedTS = _donorOplogBuffer->getLastPushedTimestamp();
+ if (lastPushedTS == info.resumeToken) {
+ // We don't want to insert a resume token noop if it would be a duplicate.
+ return Status::OK();
+ }
+ invariant(lastPushedTS < info.resumeToken,
+ str::stream() << "LastPushed: " << lastPushedTS.toString()
+ << ", resumeToken: " << info.resumeToken.toString());
+
+ MutableOplogEntry noopEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(BSON("msg" << TenantMigrationRecipientService::kNoopMsg << "tenantId"
+ << getTenantId() << "migrationId" << getMigrationUUID()));
+ noopEntry.setTimestamp(info.resumeToken);
+ // This term is not used for anything.
+ noopEntry.setTerm(OpTime::kUninitializedTerm);
+
+ // Use an empty namespace string so this op is ignored by the applier.
+ noopEntry.setNss({});
+ // Use an empty wall clock time since we have no wall clock time, but we must give it one, and
+ // we want it to be clearly fake.
+ noopEntry.setWallClockTime({});
+
+ OplogBuffer::Batch noopVec = {noopEntry.toBSON()};
+ _donorOplogBuffer->push(opCtx.get(), noopVec.cbegin(), noopVec.cend());
return Status::OK();
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index e0b460479be..94376082f38 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -60,6 +60,7 @@ class TenantMigrationRecipientService final : public PrimaryOnlyService {
public:
static constexpr StringData kTenantMigrationRecipientServiceName =
"TenantMigrationRecipientService"_sd;
+ static constexpr StringData kNoopMsg = "Resume token noop"_sd;
explicit TenantMigrationRecipientService(ServiceContext* serviceContext);
~TenantMigrationRecipientService() = default;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index fa223e55366..85a3dcdaccf 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_buffer_collection.h"
#include "mongo/db/repl/oplog_fetcher_mock.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/primary_only_service_op_observer.h"
@@ -257,9 +258,14 @@ protected:
return instance->_oplogFetcherClient.get();
}
- OplogFetcher* getDonorOplogFetcher(
+ OplogFetcherMock* getDonorOplogFetcher(
const TenantMigrationRecipientService::Instance* instance) const {
- return instance->_donorOplogFetcher.get();
+ return static_cast<OplogFetcherMock*>(instance->_donorOplogFetcher.get());
+ }
+
+ OplogBufferCollection* getDonorOplogBuffer(
+ const TenantMigrationRecipientService::Instance* instance) const {
+ return instance->_donorOplogBuffer.get();
}
const TenantMigrationRecipientDocument& getStateDoc(
@@ -270,6 +276,8 @@ protected:
private:
unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{
logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)};
+ unittest::MinimumLoggedSeverityGuard _tenantMigrationSeverityGuard{
+ logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)};
};
@@ -1024,5 +1032,109 @@ TEST_F(TenantMigrationRecipientServiceTest, StoppingApplierAllowsCompletion) {
ASSERT_NOT_OK(instance->getCompletionFuture().getNoThrow());
}
+TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTokenNoopsToBuffer) {
+ FailPointEnableBlock fp("fpAfterCollectionClonerDone",
+ BSON("action"
+ << "stop"));
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /*dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+
+ // Hang the recipient service after starting the oplog fetcher.
+ auto oplogFetcherFP =
+ globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
+ auto initialTimesEntered = oplogFetcherFP->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ // Wait for the oplog fetcher to start.
+ oplogFetcherFP->waitForTimesEntered(initialTimesEntered + 1);
+
+ // Feed the oplog fetcher a resume token.
+ auto oplogFetcher = getDonorOplogFetcher(instance.get());
+ const auto resumeToken1 = topOfOplogOpTime.getTimestamp();
+ auto oplogEntry1 = makeOplogEntry(topOfOplogOpTime,
+ OpTypeEnum::kInsert,
+ NamespaceString("foo.bar") /* namespace */,
+ UUID::gen() /* uuid */,
+ BSON("doc" << 2) /* o */,
+ boost::none /* o2 */);
+ oplogFetcher->receiveBatch(17, {oplogEntry1.toBSON()}, resumeToken1);
+
+ const Timestamp oplogEntryTS2 = Timestamp(6, 2);
+ const Timestamp resumeToken2 = Timestamp(7, 3);
+ auto oplogEntry2 = makeOplogEntry(OpTime(oplogEntryTS2, topOfOplogOpTime.getTerm()),
+ OpTypeEnum::kInsert,
+ NamespaceString("foo.bar") /* namespace */,
+ UUID::gen() /* uuid */,
+ BSON("doc" << 3) /* o */,
+ boost::none /* o2 */);
+ oplogFetcher->receiveBatch(17, {oplogEntry2.toBSON()}, resumeToken2);
+
+ // Receive an empty batch.
+ oplogFetcher->receiveBatch(17, {}, resumeToken2);
+
+ auto oplogBuffer = getDonorOplogBuffer(instance.get());
+ ASSERT_EQUALS(oplogBuffer->getCount(), 3);
+
+ {
+ BSONObj insertDoc;
+ ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &insertDoc));
+ LOGV2(5124601, "Insert oplog entry", "entry"_attr = insertDoc);
+ ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry1.toBSON());
+ }
+
+ {
+ BSONObj insertDoc;
+ ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &insertDoc));
+ LOGV2(5124602, "Insert oplog entry", "entry"_attr = insertDoc);
+ ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry2.toBSON());
+ }
+
+ {
+ BSONObj noopDoc;
+ ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &noopDoc));
+ LOGV2(5124603, "Noop oplog entry", "entry"_attr = noopDoc);
+ OplogEntry noopEntry(noopDoc);
+ ASSERT_TRUE(noopEntry.getOpType() == OpTypeEnum::kNoop);
+ ASSERT_EQUALS(noopEntry.getTimestamp(), resumeToken2);
+ ASSERT_EQUALS(noopEntry.getTerm().get(), -1);
+ ASSERT_EQUALS(noopEntry.getNss(), NamespaceString(""));
+ }
+
+ ASSERT_TRUE(oplogBuffer->isEmpty());
+
+ // Let the recipient service complete.
+ oplogFetcherFP->setMode(FailPoint::off);
+
+ // Wait for task completion success.
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 72c5294867f..157961b3749 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/insert_group.h"
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_oplog_batcher.h"
#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -224,8 +225,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
"Tenant Oplog Applier finished applying batch",
"tenant"_attr = _tenantId,
"migrationUuid"_attr = _migrationUuid,
- "lastDonorOptime"_attr = lastBatchCompletedOpTimes.donorOpTime,
- "lastRecipientOptime"_attr = lastBatchCompletedOpTimes.recipientOpTime);
+ "lastBatchCompletedOpTimes"_attr = lastBatchCompletedOpTimes);
// Notify all the waiters on optimes before and including _lastBatchCompletedOpTimes.
auto firstUnexpiredIter =
@@ -285,6 +285,21 @@ void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx,
}
}
+namespace {
+bool isResumeTokenNoop(const OplogEntry& entry) {
+ if (entry.getOpType() != OpTypeEnum::kNoop) {
+ return false;
+ }
+ if (!entry.getObject().hasField("msg")) {
+ return false;
+ }
+ if (entry.getObject().getStringField("msg") != TenantMigrationRecipientService::kNoopMsg) {
+ return false;
+ }
+ return true;
+}
+} // namespace
+
TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
OperationContext* opCtx, const TenantOplogBatch& batch) {
auto* opObserver = cc().getServiceContext()->getOpObserver();
@@ -292,8 +307,18 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
WriteUnitOfWork wuow(opCtx);
// Reserve oplog slots for all entries. This allows us to write them in parallel.
auto oplogSlots = repl::getNextOpTimes(opCtx, batch.ops.size());
+ // Keep track of the greatest oplog slot actually used, ignoring resume token noops. This is
+ // what we want to return from this function.
+ auto greatestOplogSlotUsed = OpTime();
auto slotIter = oplogSlots.begin();
for (const auto& op : batch.ops) {
+ if (isResumeTokenNoop(op.entry)) {
+ // We do not want to set the recipient optime for resume token noop oplog entries since
+ // we won't actually apply them.
+ slotIter++;
+ continue;
+ }
+ greatestOplogSlotUsed = *slotIter;
_setRecipientOpTime(op.entry.getOpTime(), *slotIter++);
}
const size_t numOplogThreads = _writerPool->getStats().numThreads;
@@ -326,7 +351,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
}
invariant(opsIter == batch.ops.end());
_writerPool->waitForIdle();
- return {batch.ops.back().entry.getOpTime(), oplogSlots.back()};
+ return {batch.ops.back().entry.getOpTime(), greatestOplogSlotUsed};
}
@@ -379,18 +404,24 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
WriteUnitOfWork wuow(opCtx.get());
auto slot = firstSlot;
for (auto iter = begin; iter != end; iter++, slot++) {
+ const auto& entry = iter->entry;
+ if (isResumeTokenNoop(entry)) {
+ // We don't want to write noops for resume token noop oplog entries. They would
+ // not be applied in a change stream anyways.
+ continue;
+ }
opObserver->onInternalOpMessage(
opCtx.get(),
- iter->entry.getNss(),
- iter->entry.getUuid(),
- iter->entry.toBSON(),
+ entry.getNss(),
+ entry.getUuid(),
+ entry.toBSON(),
BSONObj(),
// We link the no-ops together by recipient op time the same way the actual ops
// were linked together by donor op time. This is to allow retryable writes
// and changestreams to find the ops they need.
- _maybeGetRecipientOpTime(iter->entry.getPreImageOpTime()),
- _maybeGetRecipientOpTime(iter->entry.getPostImageOpTime()),
- _maybeGetRecipientOpTime(iter->entry.getPrevWriteOpTimeInTransaction()),
+ _maybeGetRecipientOpTime(entry.getPreImageOpTime()),
+ _maybeGetRecipientOpTime(entry.getPostImageOpTime()),
+ _maybeGetRecipientOpTime(entry.getPrevWriteOpTimeInTransaction()),
*slot);
}
});
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index 6cc1512f7d5..3512cac8a1d 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -62,6 +62,10 @@ public:
return recipientOpTime < other.recipientOpTime;
return donorOpTime < other.donorOpTime;
}
+ std::string toString() const {
+ return BSON("donorOpTime" << donorOpTime << "recipientOpTime" << recipientOpTime)
+ .toString();
+ }
OpTime donorOpTime;
OpTime recipientOpTime;
};
@@ -82,11 +86,6 @@ public:
*/
SemiFuture<OpTimePair> getNotificationForOpTime(OpTime donorOpTime);
- /**
- * Returns the last donor and recipient optimes of the last batch applied.
- */
- OpTimePair getLastBatchCompletedOpTimes();
-
void setBatchLimits_forTest(TenantOplogBatcher::BatchLimits limits) {
_limits = limits;
}
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index c9c0843fc44..d995e3b86d6 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_oplog_applier.h"
#include "mongo/db/repl/tenant_oplog_batcher.h"
#include "mongo/db/service_context_d_test_fixture.h"
@@ -184,6 +185,8 @@ protected:
private:
unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{
logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(1)};
+ unittest::MinimumLoggedSeverityGuard _tenantMigrationSeverityGuard{
+ logv2::LogComponent::kTenantMigration, logv2::LogSeverity::Debug(1)};
};
TEST_F(TenantOplogApplierTest, NoOpsForSingleBatch) {
@@ -726,5 +729,127 @@ TEST_F(TenantOplogApplierTest, ApplyCRUD_WrongUUID) {
applier.join();
}
+TEST_F(TenantOplogApplierTest, ApplyNoop_Success) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeNoopOplogEntry(1, "foo"));
+ pushOps(srcOps);
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ TenantOplogApplier applier(
+ _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
+ ASSERT_OK(applier.startup());
+ auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ auto futureRes = opAppliedFuture.getNoThrow();
+
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(1, entries.size());
+
+ ASSERT_OK(futureRes.getStatus());
+ ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime());
+ ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
+
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoop_Success) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg));
+ pushOps(srcOps);
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ TenantOplogApplier applier(
+ _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
+ ASSERT_OK(applier.startup());
+ auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[0].getOpTime());
+ auto futureRes = opAppliedFuture.getNoThrow();
+
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(0, entries.size());
+
+ ASSERT_OK(futureRes.getStatus());
+ ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[0].getOpTime());
+ ASSERT_EQUALS(futureRes.getValue().recipientOpTime, OpTime());
+
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, ApplyResumeTokenNoopThenInsert_Success) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "foo"), UUID::gen()));
+ pushOps(srcOps);
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ TenantOplogApplier applier(
+ _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
+ ASSERT_OK(applier.startup());
+ auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ auto futureRes = opAppliedFuture.getNoThrow();
+
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(1, entries.size());
+ assertNoOpMatches(srcOps[1], entries[0]);
+
+ ASSERT_OK(futureRes.getStatus());
+ ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
+ ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
+
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoopSameTimestamp_Success) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeNoopOplogEntry(1, TenantMigrationRecipientService::kNoopMsg));
+ pushOps(srcOps);
+ ASSERT_EQ(srcOps[0].getOpTime(), srcOps[1].getOpTime());
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ TenantOplogApplier applier(
+ _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
+ ASSERT_OK(applier.startup());
+ auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ auto futureRes = opAppliedFuture.getNoThrow();
+
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(1, entries.size());
+ assertNoOpMatches(srcOps[0], entries[0]);
+
+ ASSERT_OK(futureRes.getStatus());
+ ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
+ ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
+
+ applier.shutdown();
+ applier.join();
+}
+
+TEST_F(TenantOplogApplierTest, ApplyResumeTokenInsertThenNoop_Success) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo"), UUID::gen()));
+ srcOps.push_back(makeNoopOplogEntry(2, TenantMigrationRecipientService::kNoopMsg));
+ pushOps(srcOps);
+ auto writerPool = makeTenantMigrationWriterPool();
+
+ TenantOplogApplier applier(
+ _migrationUuid, _tenantId, OpTime(), &_oplogBuffer, _executor, writerPool.get());
+ ASSERT_OK(applier.startup());
+ auto opAppliedFuture = applier.getNotificationForOpTime(srcOps[1].getOpTime());
+ auto futureRes = opAppliedFuture.getNoThrow();
+
+ auto entries = _opObserver->getEntries();
+ ASSERT_EQ(1, entries.size());
+ assertNoOpMatches(srcOps[0], entries[0]);
+
+ ASSERT_OK(futureRes.getStatus());
+ ASSERT_EQUALS(futureRes.getValue().donorOpTime, srcOps[1].getOpTime());
+ ASSERT_EQUALS(futureRes.getValue().recipientOpTime, entries[0].getOpTime());
+
+ applier.shutdown();
+ applier.join();
+}
+
} // namespace repl
} // namespace mongo