summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.cpp')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp94
1 files changed, 40 insertions, 54 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 153bb71cb9c..3eb5a13583a 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -176,39 +176,23 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
}
OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
- OpTime lastFetched,
- HostAndPort source,
- ReplSetConfig config,
std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision,
- int requiredRBID,
- bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn,
- const int batchSize,
- StartingPoint startingPoint,
- BSONObj filter,
- ReadConcernArgs readConcern,
- bool requestResumeToken,
- StringData name)
- : AbstractAsyncComponent(executor, name.toString()),
- _source(source),
- _requiredRBID(requiredRBID),
+ Config config)
+ : AbstractAsyncComponent(executor, config.name),
+ _receivedRBID(config.requiredRBID),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
_onShutdownCallbackFn(onShutdownCallbackFn),
- _lastFetched(lastFetched),
+ _lastFetched(config.initialLastFetched),
_createClientFn(
[] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }),
- _requireFresherSyncSource(requireFresherSyncSource),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
- _awaitDataTimeout(calculateAwaitDataTimeout(config)),
- _batchSize(batchSize),
- _startingPoint(startingPoint),
- _queryFilter(filter),
- _queryReadConcern(readConcern),
- _requestResumeToken(requestResumeToken) {
- invariant(config.isInitialized());
+ _awaitDataTimeout(calculateAwaitDataTimeout(config.replSetConfig)),
+ _config(std::move(config)) {
+ invariant(_config.replSetConfig.isInitialized());
invariant(!_lastFetched.isNull());
invariant(onShutdownCallbackFn);
invariant(enqueueDocumentsFn);
@@ -257,7 +241,7 @@ std::string OplogFetcher::toString() {
str::stream output;
output << "OplogFetcher -";
output << " last optime fetched: " << _lastFetched.toString();
- output << " source: " << _source.toString();
+ output << " source: " << _config.source.toString();
output << " namespace: " << _nss.toString();
output << " active: " << _isActive_inlock();
output << " shutting down?:" << _isShuttingDown_inlock();
@@ -328,7 +312,7 @@ void OplogFetcher::_finishCallback(Status status) {
str::stream() << "Got error: \"" << status.toString()
<< "\" while oplog fetcher is shutting down");
}
- _onShutdownCallbackFn(status, _requiredRBID);
+ _onShutdownCallbackFn(status, _receivedRBID);
decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision;
@@ -452,12 +436,12 @@ Status OplogFetcher::_connect() {
"error"_attr = connectStatus);
_conn->checkConnection();
} else {
- uassertStatusOK(_conn->connect(_source, "OplogFetcher"));
+ uassertStatusOK(_conn->connect(_config.source, "OplogFetcher"));
}
uassertStatusOK(replAuthenticate(_conn.get())
.withContext(str::stream()
<< "OplogFetcher failed to authenticate to "
- << _source));
+ << _config.source));
// Reset any state needed to track restarts on successful connection.
_oplogFetcherRestartDecision->fetchSuccessful(this);
return Status::OK();
@@ -505,14 +489,15 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
BSONObjBuilder filterBob(queryBob.subobjStart("query"));
filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp()));
// Handle caller-provided filter.
- if (!_queryFilter.isEmpty()) {
+ if (!_config.queryFilter.isEmpty()) {
filterBob.append(
- "$or", BSON_ARRAY(_queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp())));
+ "$or",
+ BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp())));
}
filterBob.done();
queryBob.append("$maxTimeMS", findTimeout);
- if (_requestResumeToken) {
+ if (_config.requestResumeToken) {
queryBob.append("$hint", BSON("$natural" << 1));
queryBob.append("$_requestResumeToken", true);
}
@@ -524,7 +509,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
queryBob.append("term", term);
}
- if (_queryReadConcern.isEmpty()) {
+ if (_config.queryReadConcern.isEmpty()) {
// This ensures that the sync source waits for all earlier oplog writes to be visible.
// Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use.
queryBob.append("readConcern",
@@ -533,7 +518,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const {
<< "afterClusterTime" << Timestamp(0, 1)));
} else {
// Caller-provided read concern.
- queryBob.appendElements(_queryReadConcern.toBSON());
+ queryBob.appendElements(_config.queryReadConcern.toBSON());
}
return queryBob.obj();
@@ -556,7 +541,7 @@ void OplogFetcher::_createNewCursor(bool initialFind) {
nullptr /* fieldsToReturn */,
QueryOption_CursorTailable | QueryOption_AwaitData |
(oplogFetcherUsesExhaust ? QueryOption_Exhaust : 0),
- _batchSize);
+ _config.batchSize);
_firstBatch = true;
@@ -576,8 +561,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() {
if (!_cursor->init()) {
_cursor.reset();
return {ErrorCodes::InvalidSyncSource,
- str::stream()
- << "Oplog fetcher could not create cursor on source: " << _source};
+ str::stream() << "Oplog fetcher could not create cursor on source: "
+ << _config.source};
}
// This will also set maxTimeMS on the generated getMore command.
@@ -591,8 +576,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() {
// Due to a bug in DBClientCursor, it actually uses batchSize 2 if the given batchSize
// is 1 for the find command. So if the given batchSize is 1, we need to set it
// explicitly for getMores.
- if (_batchSize == 1) {
- _cursor->setBatchSize(_batchSize);
+ if (_config.batchSize == 1) {
+ _cursor->setBatchSize(_config.batchSize);
}
} else {
auto lastCommittedWithCurrentTerm =
@@ -690,7 +675,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
"invalid oplog query metadata from sync source {syncSource}: "
"{error}: {metadata}",
"Invalid oplog query metadata from sync source",
- "syncSource"_attr = _source,
+ "syncSource"_attr = _config.source,
"error"_attr = oqMetadataResult.getStatus(),
"metadata"_attr = _metadataObj);
return oqMetadataResult.getStatus();
@@ -709,7 +694,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
1,
"oplog fetcher successfully fetched from {syncSource}",
"Oplog fetcher successfully fetched from sync source",
- "syncSource"_attr = _source);
+ "syncSource"_attr = _config.source);
// We do not always enqueue the first document. We elect to skip it for the following
// reasons:
@@ -724,13 +709,13 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// happens on the first batch when we always accept a document with the previous
// fetched timestamp.
- if (_startingPoint == StartingPoint::kSkipFirstDoc) {
+ if (_config.startingPoint == StartingPoint::kSkipFirstDoc) {
firstDocToApply++;
- } else if (!_queryFilter.isEmpty()) {
+ } else if (!_config.queryFilter.isEmpty()) {
auto opCtx = cc().makeOperationContext();
auto expCtx =
make_intrusive<ExpressionContext>(opCtx.get(), nullptr /* collator */, _nss);
- Matcher m(_queryFilter, expCtx);
+ Matcher m(_config.queryFilter, expCtx);
if (!m.matches(*firstDocToApply))
firstDocToApply++;
}
@@ -740,7 +725,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
auto previousOpTimeFetched = _getLastOpTimeFetched();
auto validateResult = OplogFetcher::validateDocuments(
- documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _startingPoint);
+ documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _config.startingPoint);
if (!validateResult.isOK()) {
return validateResult.getStatus();
}
@@ -757,7 +742,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
"invalid replication metadata from sync source {syncSource}: "
"{error}: {metadata}",
"Invalid replication metadata from sync source",
- "syncSource"_attr = _source,
+ "syncSource"_attr = _config.source,
"error"_attr = metadataResult.getStatus(),
"metadata"_attr = _metadataObj);
return metadataResult.getStatus();
@@ -766,9 +751,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// Determine if we should stop syncing from our current sync source.
auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching(
- _source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime);
+ _config.source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime);
str::stream errMsg;
- errMsg << "sync source " << _source.toString();
+ errMsg << "sync source " << _config.source.toString();
errMsg << " (config version: " << replSetMetadata.getConfigVersion();
errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString();
errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex();
@@ -815,7 +800,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// Start skipping the first doc after at least one doc has been enqueued in the lifetime
// of this fetcher.
- _startingPoint = StartingPoint::kSkipFirstDoc;
+ _config.startingPoint = StartingPoint::kSkipFirstDoc;
// We have now processed the batch. We should only move forward our view of _lastFetched if the
// batch was not empty.
@@ -843,15 +828,15 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
// upstream node hasn't rolled back since that could cause it to not have our required minValid
// point. The cursor will be killed if the upstream node rolls back so we don't need to keep
// checking once the cursor is established. If we do not use rollback-via-refetch, this check is
- // not necessary, and _requiredRBID will be set to kUninitializedRollbackId in that case.
- if (_requiredRBID != ReplicationProcess::kUninitializedRollbackId &&
- remoteRBID != _requiredRBID) {
+ // not necessary, and _config.requiredRBID will be set to kUninitializedRollbackId in that case.
+ if (_config.requiredRBID != ReplicationProcess::kUninitializedRollbackId &&
+ remoteRBID != _config.requiredRBID) {
return Status(ErrorCodes::InvalidSyncSource,
"Upstream node rolled back after choosing it as a sync source. Choosing "
"new sync source.");
}
- // Set _requiredRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down.
- _requiredRBID = remoteRBID;
+ // Set _receivedRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down.
+ _receivedRBID = remoteRBID;
// Sometimes our remoteLastOpApplied may be stale; if we received a document with an
// opTime later than remoteLastApplied, we can assume the remote is at least up to that
@@ -867,7 +852,7 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
// The sync source could be behind us if it rolled back after we selected it. We could have
// failed to detect the rollback if it occurred between sync source selection (when we check the
- // candidate is ahead of us) and sync source resolution (when we got '_requiredRBID'). If the
+ // candidate is ahead of us) and sync source resolution (when we got '_receivedRBID'). If the
// sync source is now behind us, choose a new sync source to prevent going into rollback.
if (remoteLastOpApplied < lastFetched) {
return Status(ErrorCodes::InvalidSyncSource,
@@ -885,7 +870,8 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
// problematic to check this condition for initial sync, since the 'lastFetched' OpTime will
// almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied
// OpTime to determine where to start our OplogFetcher.
- if (_requireFresherSyncSource && remoteLastOpApplied <= lastFetched) {
+ if (_config.requireFresherSyncSource == RequireFresherSyncSource::kRequireFresherSyncSource &&
+ remoteLastOpApplied <= lastFetched) {
return Status(ErrorCodes::InvalidSyncSource,
"Sync source must be ahead of me. My last fetched oplog optime: {}, latest "
"oplog optime of sync source: {}"_format(lastFetched.toString(),