summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2020-06-24 11:15:43 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-22 14:26:31 +0000
commit803d63f2a918e2b002880992af229affcb08a176 (patch)
tree90a1dba812f186546143f4efad1da8c6f1a5a96e /src/mongo/db/repl
parent86d94765b9030c9aa9adb3899277456e64bbd348 (diff)
downloadmongo-803d63f2a918e2b002880992af229affcb08a176.tar.gz
SERVER-48849 Add filtering to the OplogFetcher
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp36
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h12
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp134
3 files changed, 157 insertions, 25 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 7bdd2af9b90..12c11418564 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -185,8 +185,11 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
const int batchSize,
- StartingPoint startingPoint)
- : AbstractAsyncComponent(executor, "oplog fetcher"),
+ StartingPoint startingPoint,
+ BSONObj filter,
+ ReadConcernArgs readConcern,
+ StringData name)
+ : AbstractAsyncComponent(executor, name.toString()),
_source(source),
_requiredRBID(requiredRBID),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
@@ -199,7 +202,9 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
_batchSize(batchSize),
- _startingPoint(startingPoint) {
+ _startingPoint(startingPoint),
+ _queryFilter(filter),
+ _queryReadConcern(readConcern) {
invariant(config.isInitialized());
invariant(!_lastFetched.isNull());
invariant(onShutdownCallbackFn);
@@ -471,7 +476,13 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
BSONObjBuilder queryBob;
auto lastOpTimeFetched = _getLastOpTimeFetched();
- queryBob.append("query", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
+ BSONObjBuilder filterBob(queryBob.subobjStart("query"));
+ filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp()));
+ // Handle caller-provided filter.
+ if (!_queryFilter.isEmpty()) {
+ filterBob.append("$and", _queryFilter);
+ }
+ filterBob.done();
queryBob.append("$maxTimeMS", findTimeout);
@@ -482,12 +493,17 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
queryBob.append("term", term);
}
- // This ensures that the sync source waits for all earlier oplog writes to be visible.
- // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use.
- queryBob.append("readConcern",
- BSON("level"
- << "local"
- << "afterClusterTime" << Timestamp(0, 1)));
+ if (_queryReadConcern.isEmpty()) {
+ // This ensures that the sync source waits for all earlier oplog writes to be visible.
+ // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use.
+ queryBob.append("readConcern",
+ BSON("level"
+ << "local"
+ << "afterClusterTime" << Timestamp(0, 1)));
+ } else {
+ // Caller-provided read concern.
+ queryBob.appendElements(_queryReadConcern.toBSON());
+ }
return queryBob.obj();
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 4c3a765a780..0ae2d472cc9 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -177,7 +177,10 @@ public:
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
const int batchSize,
- StartingPoint startingPoint = StartingPoint::kSkipFirstDoc);
+ StartingPoint startingPoint = StartingPoint::kSkipFirstDoc,
+ BSONObj filter = BSONObj(),
+ ReadConcernArgs readConcern = ReadConcernArgs(),
+ StringData name = "oplog fetcher"_sd);
virtual ~OplogFetcher();
@@ -431,6 +434,13 @@ private:
// Indicates if we want to skip the first document during oplog fetching or not.
StartingPoint _startingPoint;
+ // Predicate with additional filtering to be done on oplog entries.
+ BSONObj _queryFilter;
+
+ // Read concern to use for reading the oplog. Empty read concern means we use a default
+ // of "afterClusterTime: Timestamp(0,1)".
+ ReadConcernArgs _queryReadConcern;
+
// Handle to currently scheduled _runQuery task.
executor::TaskExecutor::CallbackHandle _runQueryHandle;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 2359c1ba250..e22aab2f798 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -153,19 +153,30 @@ void validateMetadataRequest(OpMsg msg) {
msg.body.getObjectField("$readPreference"));
}
-void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) {
+void validateFindCommand(Message m,
+ OpTime lastFetched,
+ int findTimeout,
+ BSONObj filter = BSONObj(),
+ ReadConcernArgs readConcern = ReadConcernArgs::fromBSONThrows(
+ BSON("level"
+ << "local"
+ << "afterClusterTime" << Timestamp(0, 1)))) {
auto msg = mongo::OpMsg::parse(m);
ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find");
ASSERT_TRUE(msg.body.getBoolField("tailable"));
ASSERT_TRUE(msg.body.getBoolField("awaitData"));
ASSERT_EQUALS(findTimeout, msg.body.getIntField("maxTimeMS"));
- ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())),
- msg.body.getObjectField("filter"));
+ if (filter.isEmpty()) {
+ ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())),
+
+ msg.body.getObjectField("filter"));
+ } else {
+ ASSERT_BSONOBJ_EQ(
+ BSON("ts" << BSON("$gte" << lastFetched.getTimestamp()) << "$and" << filter),
+ msg.body.getObjectField("filter"));
+ }
ASSERT_EQUALS(lastFetched.getTerm(), msg.body.getIntField("term"));
- ASSERT_BSONOBJ_EQ(BSON("level"
- << "local"
- << "afterClusterTime" << Timestamp(0, 1)),
- msg.body.getObjectField("readConcern"));
+ ASSERT_BSONOBJ_EQ(readConcern.toBSONInner(), msg.body.getObjectField("readConcern"));
// The find command should not specify the deprecated 'oplogReplay' flag.
ASSERT_FALSE(msg.body["oplogReplay"]);
@@ -304,13 +315,17 @@ protected:
int numRestarts = 0,
bool requireFresherSyncSource = true,
OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
- int requiredRBID = ReplicationProcess::kUninitializedRollbackId);
+ int requiredRBID = ReplicationProcess::kUninitializedRollbackId,
+ BSONObj filter = BSONObj(),
+ ReadConcernArgs readConcern = ReadConcernArgs());
std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated(
OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts = 0,
bool requireFresherSyncSource = true,
OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
- int requiredRBID = ReplicationProcess::kUninitializedRollbackId);
+ int requiredRBID = ReplicationProcess::kUninitializedRollbackId,
+ BSONObj filter = BSONObj(),
+ ReadConcernArgs args = ReadConcernArgs());
std::unique_ptr<ShutdownState> processSingleBatch(
const Message& response,
@@ -394,9 +409,17 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCr
int numRestarts,
bool requireFresherSyncSource,
OplogFetcher::StartingPoint startingPoint,
- int requiredRBID) {
- auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(
- &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint, requiredRBID);
+ int requiredRBID,
+ BSONObj filter,
+ ReadConcernArgs readConcern) {
+ auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(&getExecutor(),
+ fn,
+ numRestarts,
+ requireFresherSyncSource,
+ startingPoint,
+ requiredRBID,
+ filter,
+ readConcern);
auto waitForConnCreatedFailPoint =
globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled");
@@ -419,7 +442,9 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
int numRestarts,
bool requireFresherSyncSource,
OplogFetcher::StartingPoint startingPoint,
- int requiredRBID) {
+ int requiredRBID,
+ BSONObj filter,
+ ReadConcernArgs readConcern) {
auto oplogFetcher = std::make_unique<OplogFetcher>(
executor,
lastFetched,
@@ -432,7 +457,9 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
enqueueDocumentsFn,
fn,
defaultBatchSize,
- startingPoint);
+ startingPoint,
+ filter,
+ readConcern);
oplogFetcher->setCreateClientFn_forTest([this]() {
const auto autoReconnect = true;
return std::unique_ptr<DBClientConnection>(
@@ -2294,4 +2321,83 @@ TEST_F(OplogFetcherTest, HandleLogicalTimeMetaDataAndAdvanceClusterTime) {
ASSERT_EQ(currentClusterTime, logicalTime);
ASSERT_NE(oldClusterTime, logicalTime);
}
+
+TEST_F(OplogFetcherTest, CheckFindCommandIncludesFilter) {
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher without any retries but with a filter. Note the filter is not
+ // respected as our Mock objects do not respect them; this unit test only tests the command
+ // is well-formed.
+ const BSONObj filter = BSON("ns" << BSON("$regexp"
+ << "/^tenant_.*/"));
+ auto oplogFetcher =
+ getOplogFetcherAfterConnectionCreated(std::ref(shutdownState),
+ 0 /* numRestarts */,
+ true /* requireFresherSyncSourc */,
+ OplogFetcher::StartingPoint::kSkipFirstDoc,
+ ReplicationProcess::kUninitializedRollbackId,
+ filter);
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
+ auto firstBatch = {firstEntry, secondEntry};
+
+ // Update lastFetched before it is updated by getting the next batch.
+ lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
+
+ // Creating the cursor will succeed.
+ auto m = processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(),
+ makeFirstBatch(cursorId, firstBatch, metadataObj),
+ true);
+
+ validateFindCommand(m,
+ lastFetched,
+ durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()),
+ filter);
+
+ oplogFetcher->shutdown();
+ oplogFetcher->join();
+}
+
+TEST_F(OplogFetcherTest, CheckFindCommandIncludesCustomReadConcern) {
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher without any retries but with a custom read concern.
+ auto readConcern = ReadConcernArgs::fromBSONThrows(BSON("level"
+ << "majority"));
+ auto oplogFetcher =
+ getOplogFetcherAfterConnectionCreated(std::ref(shutdownState),
+ 0 /* numRestarts */,
+ true /* requireFresherSyncSourc */,
+ OplogFetcher::StartingPoint::kSkipFirstDoc,
+ ReplicationProcess::kUninitializedRollbackId,
+ BSONObj() /* filter */,
+ readConcern);
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
+ auto firstBatch = {firstEntry, secondEntry};
+
+ // Update lastFetched before it is updated by getting the next batch.
+ lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
+
+ // Creating the cursor will succeed.
+ auto m = processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(),
+ makeFirstBatch(cursorId, firstBatch, metadataObj),
+ true);
+
+ validateFindCommand(m,
+ lastFetched,
+ durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()),
+ BSONObj() /* filter */,
+ readConcern);
+
+ oplogFetcher->shutdown();
+ oplogFetcher->join();
+}
+
} // namespace