diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 45 |
1 files changed, 14 insertions, 31 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index ca02b597752..626ccd9e6d0 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -70,7 +70,6 @@ using std::string; namespace repl { namespace { -const char kHashFieldName[] = "h"; const int kSleepToAllowBatchingMillis = 2; const int kSmallBatchLimitBytes = 40000; const Milliseconds kRollbackOplogSocketTimeout(10 * 60 * 1000); @@ -403,14 +402,12 @@ void BackgroundSync::_produce() { } } - long long lastHashFetched; { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_state != ProducerState::Running) { return; } lastOpTimeFetched = _lastOpTimeFetched; - lastHashFetched = _lastFetchedHash; } if (!_replCoord->getMemberState().primary()) { @@ -441,7 +438,7 @@ void BackgroundSync::_produce() { // replication coordinator. auto oplogFetcherPtr = stdx::make_unique<OplogFetcher>( _replicationCoordinatorExternalState->getTaskExecutor(), - OpTimeWithHash(lastHashFetched, lastOpTimeFetched), + lastOpTimeFetched, source, NamespaceString::kRsOplogNamespace, _replCoord->getConfig(), @@ -539,8 +536,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi _oplogApplier->enqueue(opCtx.get(), begin, end); // Update last fetched info. - _lastFetchedHash = info.lastDocument.value; - _lastOpTimeFetched = info.lastDocument.opTime; + _lastOpTimeFetched = info.lastDocument; LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched; } @@ -713,7 +709,6 @@ void BackgroundSync::stop(bool resetLastFetchedOptime) { if (resetLastFetchedOptime) { invariant(_oplogApplier->getBuffer()->isEmpty()); _lastOpTimeFetched = OpTime(); - _lastFetchedHash = 0; log() << "Resetting last fetched optimes in bgsync"; } @@ -727,14 +722,14 @@ void BackgroundSync::stop(bool resetLastFetchedOptime) { } void BackgroundSync::start(OperationContext* opCtx) { - OpTimeWithHash lastAppliedOpTimeWithHash; + OpTime lastAppliedOpTime; ShouldNotConflictWithSecondaryBatchApplicationBlock noConflict(opCtx->lockState()); // Explicitly start future read transactions without a timestamp. opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); do { - lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(opCtx); + lastAppliedOpTime = _readLastAppliedOpTime(opCtx); stdx::lock_guard<stdx::mutex> lk(_mutex); // Double check the state after acquiring the mutex. if (_state != ProducerState::Starting) { @@ -749,33 +744,30 @@ void BackgroundSync::start(OperationContext* opCtx) { // When a node steps down during drain mode, the last fetched optime would be newer than // the last applied. - if (_lastOpTimeFetched <= lastAppliedOpTimeWithHash.opTime) { - LOG(1) << "Setting bgsync _lastOpTimeFetched=" << lastAppliedOpTimeWithHash.opTime - << " and _lastFetchedHash=" << lastAppliedOpTimeWithHash.value + if (_lastOpTimeFetched <= lastAppliedOpTime) { + LOG(1) << "Setting bgsync _lastOpTimeFetched=" << lastAppliedOpTime << ". Previous _lastOpTimeFetched: " << _lastOpTimeFetched; - _lastOpTimeFetched = lastAppliedOpTimeWithHash.opTime; - _lastFetchedHash = lastAppliedOpTimeWithHash.value; + _lastOpTimeFetched = lastAppliedOpTime; } // Reload the last applied optime from disk if it has been changed. - } while (lastAppliedOpTimeWithHash.opTime != _replCoord->getMyLastAppliedOpTime()); + } while (lastAppliedOpTime != _replCoord->getMyLastAppliedOpTime()); - LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; + LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched; } -OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* opCtx) { +OpTime BackgroundSync::_readLastAppliedOpTime(OperationContext* opCtx) { BSONObj oplogEntry; try { bool success = writeConflictRetry( - opCtx, "readLastAppliedHash", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "readLastAppliedOpTime", NamespaceString::kRsOplogNamespace.ns(), [&] { Lock::DBLock lk(opCtx, "local", MODE_X); return Helpers::getLast( opCtx, NamespaceString::kRsOplogNamespace.ns().c_str(), oplogEntry); }); if (!success) { - // This can happen when we are to do an initial sync. lastHash will be set - // after the initial sync is complete. - return OpTimeWithHash(0); + // This can happen when we are to do an initial sync. + return OpTime(); } } catch (const ExceptionForCat<ErrorCategory::ShutdownError>&) { throw; @@ -784,19 +776,10 @@ OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* << redact(ex); fassertFailed(18904); } - long long hash; - auto status = bsonExtractIntegerField(oplogEntry, kHashFieldName, &hash); - if (!status.isOK()) { - severe() << "Most recent entry in " << NamespaceString::kRsOplogNamespace.ns() - << " is missing or has invalid \"" << kHashFieldName - << "\" field. Oplog entry: " << redact(oplogEntry) << ": " << redact(status); - fassertFailed(18902); - } OplogEntry parsedEntry(oplogEntry); - auto lastOptime = OpTimeWithHash(hash, parsedEntry.getOpTime()); LOG(1) << "Successfully read last entry of oplog while starting bgsync: " << redact(oplogEntry); - return lastOptime; + return parsedEntry.getOpTime(); } bool BackgroundSync::shouldStopFetching() const { |