summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-08-01 19:19:30 -0400
committerBenety Goh <benety@mongodb.com>2015-08-03 17:16:54 -0400
commita55dcfaa206c410778f6919dd4407bb940849443 (patch)
tree725e7af37ddfb1c0109fa72e873755035a337b65
parente69d00d7949e5373d0b58115e1b3583b245e06b4 (diff)
downloadmongo-a55dcfaa206c410778f6919dd4407bb940849443.tar.gz
SERVER-19307 background fetcher callback should not proceed if background sync is paused
-rw-r--r--src/mongo/db/repl/bgsync.cpp99
-rw-r--r--src/mongo/db/repl/bgsync.h13
2 files changed, 70 insertions, 42 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 01ca3cfdc14..ede21de7eda 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -74,6 +74,35 @@ const char hashFieldName[] = "h";
int SleepToAllowBatchingMillis = 2;
const int BatchIsSmallish = 40000; // bytes
const Milliseconds fetcherMaxTimeMS(2000);
+
+/**
+ * Checks the criteria for rolling back.
+ * 'getNextOperation' returns the first result of the oplog tailing query.
+ * 'lastOpTimeFetched' should be consistent with the predicate in the query.
+ * Returns RemoteOplogStale if the oplog query has no results.
+ * Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in
+ * the remote oplog.
+ */
+Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperation,
+ OpTime lastOpTimeFetched,
+ long long lastHashFetched) {
+ auto result = getNextOperation();
+ if (!result.isOK()) {
+ // The GTE query from upstream returns nothing, so we're ahead of the upstream.
+ return Status(ErrorCodes::RemoteOplogStale,
+ "we are ahead of the sync source, will try to roll back");
+ }
+ BSONObj o = result.getValue();
+ OpTime opTime = extractOpTime(o);
+ long long hash = o["h"].numberLong();
+ if (opTime != lastOpTimeFetched || hash != lastHashFetched) {
+ return Status(ErrorCodes::OplogStartMissing,
+ str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString()
+ << ". source's GTE: " << opTime.toString());
+ }
+ return Status::OK();
+}
+
} // namespace
MONGO_FP_DECLARE(rsBgSyncProduce);
@@ -195,7 +224,7 @@ void BackgroundSync::_producerThread(executor::TaskExecutor* taskExecutor) {
const MemberState state = _replCoord->getMemberState();
// we want to pause when the state changes to primary
if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
- if (!_pause) {
+ if (!isPaused()) {
stop();
}
sleepsecs(1);
@@ -217,7 +246,7 @@ void BackgroundSync::_producerThread(executor::TaskExecutor* taskExecutor) {
// we want to unpause when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
OperationContextImpl txn;
- if (_pause) {
+ if (isPaused()) {
start(&txn);
}
@@ -258,16 +287,21 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas
OplogReader syncSourceReader;
syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord);
+ // no server found
+ if (syncSourceReader.getHost().empty()) {
+ sleepsecs(1);
+ // if there is no one to sync from
+ return;
+ }
+
+ long long lastHashFetched;
{
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- // no server found
- if (syncSourceReader.getHost().empty()) {
- lock.unlock();
- sleepsecs(1);
- // if there is no one to sync from
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_pause) {
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
+ lastHashFetched = _lastFetchedHash;
_syncSourceHost = syncSourceReader.getHost();
_replCoord->signalUpstreamUpdater();
}
@@ -289,6 +323,8 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas
stdx::placeholders::_1,
stdx::placeholders::_3,
stdx::cref(source),
+ lastOpTimeFetched,
+ lastHashFetched,
&remoteOplogStartStatus);
auto cmdObj = BSON("find" << nsToCollectionSubstring(rsOplogName) << "filter"
@@ -309,6 +345,12 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas
}
fetcher.wait();
+ // If the background sync is paused after the fetcher is started, we need to
+ // re-evaluate our sync source and oplog common point.
+ if (isPaused()) {
+ return;
+ }
+
// Execute rollback if necessary.
// Rollback is a synchronous operation that uses the task executor and may not be
// executed inside the fetcher callback.
@@ -334,6 +376,8 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas
void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
BSONObjBuilder* bob,
const HostAndPort& source,
+ OpTime lastOpTimeFetched,
+ long long lastFetchedHash,
Status* remoteOplogStartStatus) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
@@ -345,6 +389,11 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return;
}
+ // Check if we have been paused.
+ if (isPaused()) {
+ return;
+ }
+
const auto& queryResponse = result.getValue();
const auto& documents = queryResponse.documents;
auto documentBegin = documents.cbegin();
@@ -359,7 +408,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return *(documentBegin++);
};
- *remoteOplogStartStatus = _checkRemoteOplogStart(getNextOperation);
+ *remoteOplogStartStatus =
+ checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
if (!remoteOplogStartStatus->isOK()) {
// Stop fetcher and execute rollback.
return;
@@ -447,11 +497,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
}
// Check if we have been paused.
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_pause) {
- return;
- }
+ if (isPaused()) {
+ return;
}
// We fill in 'bob' to signal the fetcher to process with another getMore.
@@ -492,25 +539,6 @@ void BackgroundSync::consume() {
bufferSizeGauge.decrement(getSize(op));
}
-Status BackgroundSync::_checkRemoteOplogStart(
- stdx::function<StatusWith<BSONObj>()> getNextOperation) {
- auto result = getNextOperation();
- if (!result.isOK()) {
- // The GTE query from upstream returns nothing, so we're ahead of the upstream.
- return Status(ErrorCodes::RemoteOplogStale,
- "we are ahead of the sync source, will try to roll back");
- }
- BSONObj o = result.getValue();
- OpTime opTime = extractOpTime(o);
- long long hash = o["h"].numberLong();
- if (opTime != _lastOpTimeFetched || hash != _lastFetchedHash) {
- return Status(ErrorCodes::OplogStartMissing,
- str::stream() << "our last op time fetched: " << _lastOpTimeFetched.toString()
- << ". source's GTE: " << opTime.toString());
- }
- return Status::OK();
-}
-
void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
stdx::function<DBClientBase*()> getConnection) {
@@ -565,6 +593,11 @@ void BackgroundSync::start(OperationContext* txn) {
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
+bool BackgroundSync::isPaused() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _pause;
+}
+
void BackgroundSync::waitUntilPaused() {
stdx::unique_lock<stdx::mutex> lock(_mutex);
while (!_pause) {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 0754a111f19..757ef3c0627 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -94,6 +94,8 @@ public:
void shutdown();
void notify(OperationContext* txn);
+ bool isPaused() const;
+
// Blocks until _pause becomes true from a call to stop() or shutdown()
void waitUntilPaused();
@@ -173,18 +175,11 @@ private:
void _fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
BSONObjBuilder* bob,
const HostAndPort& source,
+ OpTime lastOpTimeFetched,
+ long long lastFetchedHash,
Status* remoteOplogStartStatus);
/**
- * Checks the criteria for rolling back.
- * 'getNextOperation' returns the first result of the oplog tailing query.
- * Returns RemoteOplogStale if the oplog query has no results.
- * Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in
- * the remote oplog.
- */
- Status _checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperation);
-
- /**
* Executes a rollback.
* 'getConnection' returns a connection to the sync source.
*/