diff options
author | Benety Goh <benety@mongodb.com> | 2015-08-01 19:19:30 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-08-03 17:16:54 -0400 |
commit | a55dcfaa206c410778f6919dd4407bb940849443 (patch) | |
tree | 725e7af37ddfb1c0109fa72e873755035a337b65 | |
parent | e69d00d7949e5373d0b58115e1b3583b245e06b4 (diff) | |
download | mongo-a55dcfaa206c410778f6919dd4407bb940849443.tar.gz |
SERVER-19307 background fetcher callback should not proceed if background sync is paused
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 13 |
2 files changed, 70 insertions, 42 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 01ca3cfdc14..ede21de7eda 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -74,6 +74,35 @@ const char hashFieldName[] = "h"; int SleepToAllowBatchingMillis = 2; const int BatchIsSmallish = 40000; // bytes const Milliseconds fetcherMaxTimeMS(2000); + +/** + * Checks the criteria for rolling back. + * 'getNextOperation' returns the first result of the oplog tailing query. + * 'lastOpTimeFetched' should be consistent with the predicate in the query. + * Returns RemoteOplogStale if the oplog query has no results. + * Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in + * the remote oplog. + */ +Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperation, + OpTime lastOpTimeFetched, + long long lastHashFetched) { + auto result = getNextOperation(); + if (!result.isOK()) { + // The GTE query from upstream returns nothing, so we're ahead of the upstream. + return Status(ErrorCodes::RemoteOplogStale, + "we are ahead of the sync source, will try to roll back"); + } + BSONObj o = result.getValue(); + OpTime opTime = extractOpTime(o); + long long hash = o["h"].numberLong(); + if (opTime != lastOpTimeFetched || hash != lastHashFetched) { + return Status(ErrorCodes::OplogStartMissing, + str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString() + << ". source's GTE: " << opTime.toString()); + } + return Status::OK(); +} + } // namespace MONGO_FP_DECLARE(rsBgSyncProduce); @@ -195,7 +224,7 @@ void BackgroundSync::_producerThread(executor::TaskExecutor* taskExecutor) { const MemberState state = _replCoord->getMemberState(); // we want to pause when the state changes to primary if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { - if (!_pause) { + if (!isPaused()) { stop(); } sleepsecs(1); @@ -217,7 +246,7 @@ void BackgroundSync::_producerThread(executor::TaskExecutor* taskExecutor) { // we want to unpause when we're no longer primary // start() also loads _lastOpTimeFetched, which we know is set from the "if" OperationContextImpl txn; - if (_pause) { + if (isPaused()) { start(&txn); } @@ -258,16 +287,21 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas OplogReader syncSourceReader; syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord); + // no server found + if (syncSourceReader.getHost().empty()) { + sleepsecs(1); + // if there is no one to sync from + return; + } + + long long lastHashFetched; { - stdx::unique_lock<stdx::mutex> lock(_mutex); - // no server found - if (syncSourceReader.getHost().empty()) { - lock.unlock(); - sleepsecs(1); - // if there is no one to sync from + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_pause) { return; } lastOpTimeFetched = _lastOpTimeFetched; + lastHashFetched = _lastFetchedHash; _syncSourceHost = syncSourceReader.getHost(); _replCoord->signalUpstreamUpdater(); } @@ -289,6 +323,8 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas stdx::placeholders::_1, stdx::placeholders::_3, stdx::cref(source), + lastOpTimeFetched, + lastHashFetched, &remoteOplogStartStatus); auto cmdObj = BSON("find" << nsToCollectionSubstring(rsOplogName) << "filter" @@ -309,6 +345,12 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas } fetcher.wait(); + // If the background sync is paused after the fetcher is started, we need to + // re-evaluate our sync source and oplog common point. + if (isPaused()) { + return; + } + // Execute rollback if necessary. // Rollback is a synchronous operation that uses the task executor and may not be // executed inside the fetcher callback. @@ -334,6 +376,8 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, BSONObjBuilder* bob, const HostAndPort& source, + OpTime lastOpTimeFetched, + long long lastFetchedHash, Status* remoteOplogStartStatus) { // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor @@ -345,6 +389,11 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return; } + // Check if we have been paused. + if (isPaused()) { + return; + } + const auto& queryResponse = result.getValue(); const auto& documents = queryResponse.documents; auto documentBegin = documents.cbegin(); @@ -359,7 +408,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return *(documentBegin++); }; - *remoteOplogStartStatus = _checkRemoteOplogStart(getNextOperation); + *remoteOplogStartStatus = + checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); if (!remoteOplogStartStatus->isOK()) { // Stop fetcher and execute rollback. return; @@ -447,11 +497,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& } // Check if we have been paused. - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_pause) { - return; - } + if (isPaused()) { + return; } // We fill in 'bob' to signal the fetcher to process with another getMore. @@ -492,25 +539,6 @@ void BackgroundSync::consume() { bufferSizeGauge.decrement(getSize(op)); } -Status BackgroundSync::_checkRemoteOplogStart( - stdx::function<StatusWith<BSONObj>()> getNextOperation) { - auto result = getNextOperation(); - if (!result.isOK()) { - // The GTE query from upstream returns nothing, so we're ahead of the upstream. - return Status(ErrorCodes::RemoteOplogStale, - "we are ahead of the sync source, will try to roll back"); - } - BSONObj o = result.getValue(); - OpTime opTime = extractOpTime(o); - long long hash = o["h"].numberLong(); - if (opTime != _lastOpTimeFetched || hash != _lastFetchedHash) { - return Status(ErrorCodes::OplogStartMissing, - str::stream() << "our last op time fetched: " << _lastOpTimeFetched.toString() - << ". source's GTE: " << opTime.toString()); - } - return Status::OK(); -} - void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, stdx::function<DBClientBase*()> getConnection) { @@ -565,6 +593,11 @@ void BackgroundSync::start(OperationContext* txn) { LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; } +bool BackgroundSync::isPaused() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _pause; +} + void BackgroundSync::waitUntilPaused() { stdx::unique_lock<stdx::mutex> lock(_mutex); while (!_pause) { diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 0754a111f19..757ef3c0627 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -94,6 +94,8 @@ public: void shutdown(); void notify(OperationContext* txn); + bool isPaused() const; + // Blocks until _pause becomes true from a call to stop() or shutdown() void waitUntilPaused(); @@ -173,18 +175,11 @@ private: void _fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, BSONObjBuilder* bob, const HostAndPort& source, + OpTime lastOpTimeFetched, + long long lastFetchedHash, Status* remoteOplogStartStatus); /** - * Checks the criteria for rolling back. - * 'getNextOperation' returns the first result of the oplog tailing query. - * Returns RemoteOplogStale if the oplog query has no results. - * Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in - * the remote oplog. - */ - Status _checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperation); - - /** * Executes a rollback. * 'getConnection' returns a connection to the sync source. */ |