diff options
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 94 |
1 files changed, 40 insertions, 54 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 153bb71cb9c..3eb5a13583a 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -176,39 +176,23 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( } OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, - OpTime lastFetched, - HostAndPort source, - ReplSetConfig config, std::unique_ptr<OplogFetcherRestartDecision> oplogFetcherRestartDecision, - int requiredRBID, - bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn, - const int batchSize, - StartingPoint startingPoint, - BSONObj filter, - ReadConcernArgs readConcern, - bool requestResumeToken, - StringData name) - : AbstractAsyncComponent(executor, name.toString()), - _source(source), - _requiredRBID(requiredRBID), + Config config) + : AbstractAsyncComponent(executor, config.name), + _receivedRBID(config.requiredRBID), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), _onShutdownCallbackFn(onShutdownCallbackFn), - _lastFetched(lastFetched), + _lastFetched(config.initialLastFetched), _createClientFn( [] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }), - _requireFresherSyncSource(requireFresherSyncSource), _dataReplicatorExternalState(dataReplicatorExternalState), _enqueueDocumentsFn(enqueueDocumentsFn), - _awaitDataTimeout(calculateAwaitDataTimeout(config)), - _batchSize(batchSize), - _startingPoint(startingPoint), - _queryFilter(filter), - _queryReadConcern(readConcern), - _requestResumeToken(requestResumeToken) { - invariant(config.isInitialized()); + _awaitDataTimeout(calculateAwaitDataTimeout(config.replSetConfig)), + _config(std::move(config)) { + invariant(_config.replSetConfig.isInitialized()); invariant(!_lastFetched.isNull()); invariant(onShutdownCallbackFn); invariant(enqueueDocumentsFn); @@ -257,7 +241,7 @@ std::string OplogFetcher::toString() { str::stream output; output << "OplogFetcher -"; output << " last optime fetched: " << _lastFetched.toString(); - output << " source: " << _source.toString(); + output << " source: " << _config.source.toString(); output << " namespace: " << _nss.toString(); output << " active: " << _isActive_inlock(); output << " shutting down?:" << _isShuttingDown_inlock(); @@ -328,7 +312,7 @@ void OplogFetcher::_finishCallback(Status status) { str::stream() << "Got error: \"" << status.toString() << "\" while oplog fetcher is shutting down"); } - _onShutdownCallbackFn(status, _requiredRBID); + _onShutdownCallbackFn(status, _receivedRBID); decltype(_onShutdownCallbackFn) onShutdownCallbackFn; decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision; @@ -452,12 +436,12 @@ Status OplogFetcher::_connect() { "error"_attr = connectStatus); _conn->checkConnection(); } else { - uassertStatusOK(_conn->connect(_source, "OplogFetcher")); + uassertStatusOK(_conn->connect(_config.source, "OplogFetcher")); } uassertStatusOK(replAuthenticate(_conn.get()) .withContext(str::stream() << "OplogFetcher failed to authenticate to " - << _source)); + << _config.source)); // Reset any state needed to track restarts on successful connection. _oplogFetcherRestartDecision->fetchSuccessful(this); return Status::OK(); @@ -505,14 +489,15 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { BSONObjBuilder filterBob(queryBob.subobjStart("query")); filterBob.append("ts", BSON("$gte" << lastOpTimeFetched.getTimestamp())); // Handle caller-provided filter. - if (!_queryFilter.isEmpty()) { + if (!_config.queryFilter.isEmpty()) { filterBob.append( - "$or", BSON_ARRAY(_queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp()))); + "$or", + BSON_ARRAY(_config.queryFilter << BSON("ts" << lastOpTimeFetched.getTimestamp()))); } filterBob.done(); queryBob.append("$maxTimeMS", findTimeout); - if (_requestResumeToken) { + if (_config.requestResumeToken) { queryBob.append("$hint", BSON("$natural" << 1)); queryBob.append("$_requestResumeToken", true); } @@ -524,7 +509,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { queryBob.append("term", term); } - if (_queryReadConcern.isEmpty()) { + if (_config.queryReadConcern.isEmpty()) { // This ensures that the sync source waits for all earlier oplog writes to be visible. // Since Timestamp(0, 0) isn't allowed, Timestamp(0, 1) is the minimal we can use. queryBob.append("readConcern", @@ -533,7 +518,7 @@ BSONObj OplogFetcher::_makeFindQuery(long long findTimeout) const { << "afterClusterTime" << Timestamp(0, 1))); } else { // Caller-provided read concern. - queryBob.appendElements(_queryReadConcern.toBSON()); + queryBob.appendElements(_config.queryReadConcern.toBSON()); } return queryBob.obj(); @@ -556,7 +541,7 @@ void OplogFetcher::_createNewCursor(bool initialFind) { nullptr /* fieldsToReturn */, QueryOption_CursorTailable | QueryOption_AwaitData | (oplogFetcherUsesExhaust ? QueryOption_Exhaust : 0), - _batchSize); + _config.batchSize); _firstBatch = true; @@ -576,8 +561,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() { if (!_cursor->init()) { _cursor.reset(); return {ErrorCodes::InvalidSyncSource, - str::stream() - << "Oplog fetcher could not create cursor on source: " << _source}; + str::stream() << "Oplog fetcher could not create cursor on source: " + << _config.source}; } // This will also set maxTimeMS on the generated getMore command. @@ -591,8 +576,8 @@ StatusWith<OplogFetcher::Documents> OplogFetcher::_getNextBatch() { // Due to a bug in DBClientCursor, it actually uses batchSize 2 if the given batchSize // is 1 for the find command. So if the given batchSize is 1, we need to set it // explicitly for getMores. - if (_batchSize == 1) { - _cursor->setBatchSize(_batchSize); + if (_config.batchSize == 1) { + _cursor->setBatchSize(_config.batchSize); } } else { auto lastCommittedWithCurrentTerm = @@ -690,7 +675,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { "invalid oplog query metadata from sync source {syncSource}: " "{error}: {metadata}", "Invalid oplog query metadata from sync source", - "syncSource"_attr = _source, + "syncSource"_attr = _config.source, "error"_attr = oqMetadataResult.getStatus(), "metadata"_attr = _metadataObj); return oqMetadataResult.getStatus(); @@ -709,7 +694,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { 1, "oplog fetcher successfully fetched from {syncSource}", "Oplog fetcher successfully fetched from sync source", - "syncSource"_attr = _source); + "syncSource"_attr = _config.source); // We do not always enqueue the first document. We elect to skip it for the following // reasons: @@ -724,13 +709,13 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // happens on the first batch when we always accept a document with the previous // fetched timestamp. - if (_startingPoint == StartingPoint::kSkipFirstDoc) { + if (_config.startingPoint == StartingPoint::kSkipFirstDoc) { firstDocToApply++; - } else if (!_queryFilter.isEmpty()) { + } else if (!_config.queryFilter.isEmpty()) { auto opCtx = cc().makeOperationContext(); auto expCtx = make_intrusive<ExpressionContext>(opCtx.get(), nullptr /* collator */, _nss); - Matcher m(_queryFilter, expCtx); + Matcher m(_config.queryFilter, expCtx); if (!m.matches(*firstDocToApply)) firstDocToApply++; } @@ -740,7 +725,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { auto previousOpTimeFetched = _getLastOpTimeFetched(); auto validateResult = OplogFetcher::validateDocuments( - documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _startingPoint); + documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _config.startingPoint); if (!validateResult.isOK()) { return validateResult.getStatus(); } @@ -757,7 +742,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { "invalid replication metadata from sync source {syncSource}: " "{error}: {metadata}", "Invalid replication metadata from sync source", - "syncSource"_attr = _source, + "syncSource"_attr = _config.source, "error"_attr = metadataResult.getStatus(), "metadata"_attr = _metadataObj); return metadataResult.getStatus(); @@ -766,9 +751,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // Determine if we should stop syncing from our current sync source. auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching( - _source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime); + _config.source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime); str::stream errMsg; - errMsg << "sync source " << _source.toString(); + errMsg << "sync source " << _config.source.toString(); errMsg << " (config version: " << replSetMetadata.getConfigVersion(); errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString(); errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex(); @@ -815,7 +800,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // Start skipping the first doc after at least one doc has been enqueued in the lifetime // of this fetcher. - _startingPoint = StartingPoint::kSkipFirstDoc; + _config.startingPoint = StartingPoint::kSkipFirstDoc; // We have now processed the batch. We should only move forward our view of _lastFetched if the // batch was not empty. @@ -843,15 +828,15 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum // upstream node hasn't rolled back since that could cause it to not have our required minValid // point. The cursor will be killed if the upstream node rolls back so we don't need to keep // checking once the cursor is established. If we do not use rollback-via-refetch, this check is - // not necessary, and _requiredRBID will be set to kUninitializedRollbackId in that case. - if (_requiredRBID != ReplicationProcess::kUninitializedRollbackId && - remoteRBID != _requiredRBID) { + // not necessary, and _config.requiredRBID will be set to kUninitializedRollbackId in that case. + if (_config.requiredRBID != ReplicationProcess::kUninitializedRollbackId && + remoteRBID != _config.requiredRBID) { return Status(ErrorCodes::InvalidSyncSource, "Upstream node rolled back after choosing it as a sync source. Choosing " "new sync source."); } - // Set _requiredRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down. - _requiredRBID = remoteRBID; + // Set _receivedRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down. + _receivedRBID = remoteRBID; // Sometimes our remoteLastOpApplied may be stale; if we received a document with an // opTime later than remoteLastApplied, we can assume the remote is at least up to that @@ -867,7 +852,7 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum // The sync source could be behind us if it rolled back after we selected it. We could have // failed to detect the rollback if it occurred between sync source selection (when we check the - // candidate is ahead of us) and sync source resolution (when we got '_requiredRBID'). If the + // candidate is ahead of us) and sync source resolution (when we got '_receivedRBID'). If the // sync source is now behind us, choose a new sync source to prevent going into rollback. if (remoteLastOpApplied < lastFetched) { return Status(ErrorCodes::InvalidSyncSource, @@ -885,7 +870,8 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied // OpTime to determine where to start our OplogFetcher. - if (_requireFresherSyncSource && remoteLastOpApplied <= lastFetched) { + if (_config.requireFresherSyncSource == RequireFresherSyncSource::kRequireFresherSyncSource && + remoteLastOpApplied <= lastFetched) { return Status(ErrorCodes::InvalidSyncSource, "Sync source must be ahead of me. My last fetched oplog optime: {}, latest " "oplog optime of sync source: {}"_format(lastFetched.toString(), |