summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/bgsync.cpp91
-rw-r--r--src/mongo/db/repl/bgsync.h10
2 files changed, 97 insertions, 4 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 671d56ed68d..98d86f0c7c0 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -52,11 +52,12 @@
#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/executor/network_interface_factory.h"
-#include "mongo/db/repl/storage_interface.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -136,6 +137,9 @@ static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes",
// Failpoint which causes rollback to hang before starting.
MONGO_FP_DECLARE(rollbackHangBeforeStart);
+// Failpoint which causes the oplog fetcher to hang before the first fetch.
+MONGO_FP_DECLARE(fetcherHangBeforeStart);
+
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count",
@@ -159,6 +163,8 @@ size_t getSize(const BSONObj& o) {
}
} // namespace
+const NamespaceString BackgroundSync::kLocalOplogNss("local.oplog.rs");
+
BackgroundSync::BackgroundSync()
: _buffer(bufferMaxSizeGauge, &getSize),
_threadPoolTaskExecutor(makeThreadPool(),
@@ -298,12 +304,13 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
-
+ HostAndPort oldSyncSource;
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
{
stdx::unique_lock<stdx::mutex> lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
+ oldSyncSource = _syncSourceHost;
_syncSourceHost = HostAndPort();
}
OplogReader syncSourceReader;
@@ -325,6 +332,17 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
+ // If our sync source has not changed, it is likely caused by our heartbeat data map being
+ // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
+ // for our map to update.
+ if (syncSourceReader.getHost() == oldSyncSource) {
+ log() << "Chose same sync source candidate as last time, " << oldSyncSource
+ << ". Sleeping for 1 second to avoid immediately choosing a new sync source for "
+ "the same reason as last time.";
+
+ sleepsecs(1);
+ }
+
long long lastHashFetched;
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -386,6 +404,20 @@ void BackgroundSync::_produce(OperationContext* txn) {
metadataBob.append(rpc::kReplSetMetadataFieldName, 1);
}
+ if (MONGO_FAIL_POINT(fetcherHangBeforeStart)) {
+ // This log output is used in js tests so please leave it.
+ log() << "BackgroundSync - fetcherHangBeforeStart fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(fetcherHangBeforeStart) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+
+ // If the sync source candidate rolls back while in this fail point, it will close all
+ // connections and the next request will fail.
+ // We manually drop all connections here so that the following Fetcher request succeeds.
+ _threadPoolTaskExecutor.dropConnections(source);
+ }
+
auto dbName = nsToDatabase(rsOplogName);
auto cmdObj = cmdBob.obj();
auto metadataObj = metadataBob.obj();
@@ -467,6 +499,39 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
+void BackgroundSync::_lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
+ OpTime lastOpTimeFetched,
+ Status* returnStatus) {
+ if (!result.isOK()) {
+ *returnStatus = result.getStatus();
+ return;
+ }
+
+ const auto& queryResponse = result.getValue();
+ if (queryResponse.documents.empty()) {
+ *returnStatus = Status(ErrorCodes::InvalidSyncSource, "Upstream node had an empty oplog.");
+ return;
+ }
+
+ const auto& remoteLastAppliedDocument = queryResponse.documents.front();
+ const auto remoteLastAppliedOpTime = OpTime::parseFromOplogEntry(remoteLastAppliedDocument);
+ if (!remoteLastAppliedOpTime.isOK()) {
+ *returnStatus = Status(ErrorCodes::InvalidBSON,
+ str::stream() << "Received invalid oplog entry from upstream node: "
+ << remoteLastAppliedDocument.toString() << ". Error: "
+ << remoteLastAppliedOpTime.getStatus().toString());
+ return;
+ }
+ if (remoteLastAppliedOpTime.getValue() <= lastOpTimeFetched) {
+ *returnStatus = Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "Upstream node's last applied OpTime "
+ << remoteLastAppliedOpTime.getValue().toString()
+ << " is not greater than our last fetched OpTime "
+ << lastOpTimeFetched.toString());
+ return;
+ }
+}
+
void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
BSONObjBuilder* bob,
const HostAndPort& source,
@@ -528,13 +593,13 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
const auto rbidElem = rbidReplyObj["rbid"];
if (rbidElem.type() != NumberInt) {
*returnStatus =
- Status(ErrorCodes::BadValue,
+ Status(ErrorCodes::InvalidSyncSource,
str::stream() << "Upstream node returned an "
<< "rbid with invalid type " << rbidElem.type());
return;
}
if (rbidElem.Int() != rbid) {
- *returnStatus = Status(ErrorCodes::BadValue,
+ *returnStatus = Status(ErrorCodes::InvalidSyncSource,
"Upstream node rolled back after verifying "
"that it had our MinValid point. Retrying.");
}
@@ -548,6 +613,24 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
if (!returnStatus->isOK())
return;
+ // Check that the upstream last applied OpTime is newer than our last fetched OpTime.
+ Fetcher lastAppliedFetcher(&_threadPoolTaskExecutor,
+ source,
+ kLocalOplogNss.db().toString(),
+ BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort"
+ << BSON("$natural" << -1)),
+ stdx::bind(&BackgroundSync::_lastAppliedFetcherCallback,
+ this,
+ stdx::placeholders::_1,
+ lastOpTimeFetched,
+ returnStatus),
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ Seconds(30));
+ lastAppliedFetcher.schedule();
+ lastAppliedFetcher.wait();
+ if (!returnStatus->isOK())
+ return;
+
auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> {
if (firstDocToApply == lastDocToApply) {
return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing");
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 0c1a1ea99f5..20ea6629ba0 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -77,6 +77,8 @@ public:
*/
class BackgroundSync : public BackgroundSyncInterface {
public:
+ static const NamespaceString kLocalOplogNss;
+
// Allow index prefetching to be turned on/off
enum IndexPrefetchConfig {
UNINITIALIZED = 0,
@@ -191,6 +193,14 @@ private:
int rbid);
/**
+ * A callback to a Fetcher that checks that the remote last applied OpTime is newer than the
+ * local last fetched OpTime.
+ */
+ void _lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
+ OpTime lastOpTimeFetched,
+ Status* returnStatus);
+
+ /**
* Executes a rollback.
* 'getConnection' returns a connection to the sync source.
*/