diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 134 |
3 files changed, 157 insertions, 25 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 7bdd2af9b90..12c11418564 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -185,8 +185,11 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, const int batchSize, - StartingPoint startingPoint) - : AbstractAsyncComponent(executor, "oplog fetcher"), + StartingPoint startingPoint, + BSONObj filter, + ReadConcernArgs readConcern, + StringData name) + : AbstractAsyncComponent(executor, name.toString()), _source(source), _requiredRBID(requiredRBID), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), @@ -199,7 +202,9 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, _enqueueDocumentsFn(enqueueDocumentsFn), _awaitDataTimeout(calculateAwaitDataTimeout(config)), _batchSize(batchSize), - _startingPoint(startingPoint) { + _startingPoint(startingPoint), + _queryFilter(filter), + _queryReadConcern(readConcern) { invariant(config.isInitialized()); invariant(!_lastFetched.isNull()); invariant(onShutdownCallbackFn); @@ -471,7 +476,13 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { BSONObjBuilder queryBob; auto lastOpTimeFetched = _getLastOpTimeFetched(); - queryBob.append("query", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); + BSONObjBuilder filterBob(queryBob.subobjStart("query")); + filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp())); + // Handle caller-provided filter. + if (!_queryFilter.isEmpty()) { + filterBob.append("$and", _queryFilter); + } + filterBob.done(); queryBob.append("$maxTimeMS", findTimeout); @@ -482,12 +493,17 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { queryBob.append("term", term); } - // This ensures that the sync source waits for all earlier oplog writes to be visible. - // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use. - queryBob.append("readConcern", - BSON("level" - << "local" - << "afterClusterTime" << Timestamp(0, 1))); + if (_queryReadConcern.isEmpty()) { + // This ensures that the sync source waits for all earlier oplog writes to be visible. + // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use. + queryBob.append("readConcern", + BSON("level" + << "local" + << "afterClusterTime" << Timestamp(0, 1))); + } else { + // Caller-provided read concern. + queryBob.appendElements(_queryReadConcern.toBSON()); + } return queryBob.obj(); } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 4c3a765a780..0ae2d472cc9 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -177,7 +177,10 @@ public: EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, const int batchSize, - StartingPoint startingPoint = StartingPoint::kSkipFirstDoc); + StartingPoint startingPoint = StartingPoint::kSkipFirstDoc, + BSONObj filter = BSONObj(), + ReadConcernArgs readConcern = ReadConcernArgs(), + StringData name = "oplog fetcher"_sd); virtual ~OplogFetcher(); @@ -431,6 +434,13 @@ private: // Indicates if we want to skip the first document during oplog fetching or not. StartingPoint _startingPoint; + // Predicate with additional filtering to be done on oplog entries. + BSONObj _queryFilter; + + // Read concern to use for reading the oplog. Empty read concern means we use a default + // of "afterClusterTime: Timestamp(0,1)". + ReadConcernArgs _queryReadConcern; + // Handle to currently scheduled _runQuery task. executor::TaskExecutor::CallbackHandle _runQueryHandle; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 2359c1ba250..e22aab2f798 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -153,19 +153,30 @@ void validateMetadataRequest(OpMsg msg) { msg.body.getObjectField("$readPreference")); } -void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) { +void validateFindCommand(Message m, + OpTime lastFetched, + int findTimeout, + BSONObj filter = BSONObj(), + ReadConcernArgs readConcern = ReadConcernArgs::fromBSONThrows( + BSON("level" + << "local" + << "afterClusterTime" << Timestamp(0, 1)))) { auto msg = mongo::OpMsg::parse(m); ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find"); ASSERT_TRUE(msg.body.getBoolField("tailable")); ASSERT_TRUE(msg.body.getBoolField("awaitData")); ASSERT_EQUALS(findTimeout, msg.body.getIntField("maxTimeMS")); - ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), - msg.body.getObjectField("filter")); + if (filter.isEmpty()) { + ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), + + msg.body.getObjectField("filter")); + } else { + ASSERT_BSONOBJ_EQ( + BSON("ts" << BSON("$gte" << lastFetched.getTimestamp()) << "$and" << filter), + msg.body.getObjectField("filter")); + } ASSERT_EQUALS(lastFetched.getTerm(), msg.body.getIntField("term")); - ASSERT_BSONOBJ_EQ(BSON("level" - << "local" - << "afterClusterTime" << Timestamp(0, 1)), - msg.body.getObjectField("readConcern")); + ASSERT_BSONOBJ_EQ(readConcern.toBSONInner(), msg.body.getObjectField("readConcern")); // The find command should not specify the deprecated 'oplogReplay' flag. ASSERT_FALSE(msg.body["oplogReplay"]); @@ -304,13 +315,17 @@ protected: int numRestarts = 0, bool requireFresherSyncSource = true, OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, - int requiredRBID = ReplicationProcess::kUninitializedRollbackId); + int requiredRBID = ReplicationProcess::kUninitializedRollbackId, + BSONObj filter = BSONObj(), + ReadConcernArgs readConcern = ReadConcernArgs()); std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated( OplogFetcher::OnShutdownCallbackFn fn, int numRestarts = 0, bool requireFresherSyncSource = true, OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, - int requiredRBID = ReplicationProcess::kUninitializedRollbackId); + int requiredRBID = ReplicationProcess::kUninitializedRollbackId, + BSONObj filter = BSONObj(), + ReadConcernArgs args = ReadConcernArgs()); std::unique_ptr<ShutdownState> processSingleBatch( const Message& response, @@ -394,9 +409,17 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCr int numRestarts, bool requireFresherSyncSource, OplogFetcher::StartingPoint startingPoint, - int requiredRBID) { - auto oplogFetcher = makeOplogFetcherWithDifferentExecutor( - &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint, requiredRBID); + int requiredRBID, + BSONObj filter, + ReadConcernArgs readConcern) { + auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(&getExecutor(), + fn, + numRestarts, + requireFresherSyncSource, + startingPoint, + requiredRBID, + filter, + readConcern); auto waitForConnCreatedFailPoint = globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled"); @@ -419,7 +442,9 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe int numRestarts, bool requireFresherSyncSource, OplogFetcher::StartingPoint startingPoint, - int requiredRBID) { + int requiredRBID, + BSONObj filter, + ReadConcernArgs readConcern) { auto oplogFetcher = std::make_unique<OplogFetcher>( executor, lastFetched, @@ -432,7 +457,9 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe enqueueDocumentsFn, fn, defaultBatchSize, - startingPoint); + startingPoint, + filter, + readConcern); oplogFetcher->setCreateClientFn_forTest([this]() { const auto autoReconnect = true; return std::unique_ptr<DBClientConnection>( @@ -2294,4 +2321,83 @@ TEST_F(OplogFetcherTest, HandleLogicalTimeMetaDataAndAdvanceClusterTime) { ASSERT_EQ(currentClusterTime, logicalTime); ASSERT_NE(oldClusterTime, logicalTime); } + +TEST_F(OplogFetcherTest, CheckFindCommandIncludesFilter) { + ShutdownState shutdownState; + + // Create an oplog fetcher without any retries but with a filter. Note the filter is not + // respected as our Mock objects do not respect them; this unit test only tests the command + // is well-formed. + const BSONObj filter = BSON("ns" << BSON("$regexp" + << "/^tenant_.*/")); + auto oplogFetcher = + getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), + 0 /* numRestarts */, + true /* requireFresherSyncSourc */, + OplogFetcher::StartingPoint::kSkipFirstDoc, + ReplicationProcess::kUninitializedRollbackId, + filter); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); + auto firstBatch = {firstEntry, secondEntry}; + + // Update lastFetched before it is updated by getting the next batch. + lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); + + // Creating the cursor will succeed. + auto m = processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(), + makeFirstBatch(cursorId, firstBatch, metadataObj), + true); + + validateFindCommand(m, + lastFetched, + durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()), + filter); + + oplogFetcher->shutdown(); + oplogFetcher->join(); +} + +TEST_F(OplogFetcherTest, CheckFindCommandIncludesCustomReadConcern) { + ShutdownState shutdownState; + + // Create an oplog fetcher without any retries but with a custom read concern. + auto readConcern = ReadConcernArgs::fromBSONThrows(BSON("level" + << "majority")); + auto oplogFetcher = + getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), + 0 /* numRestarts */, + true /* requireFresherSyncSourc */, + OplogFetcher::StartingPoint::kSkipFirstDoc, + ReplicationProcess::kUninitializedRollbackId, + BSONObj() /* filter */, + readConcern); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); + auto firstBatch = {firstEntry, secondEntry}; + + // Update lastFetched before it is updated by getting the next batch. + lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); + + // Creating the cursor will succeed. + auto m = processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(), + makeFirstBatch(cursorId, firstBatch, metadataObj), + true); + + validateFindCommand(m, + lastFetched, + durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()), + BSONObj() /* filter */, + readConcern); + + oplogFetcher->shutdown(); + oplogFetcher->join(); +} + } // namespace |