summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp79
1 files changed, 54 insertions, 25 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 37ba22bf1a7..75a91db536d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -117,7 +117,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat
} // namespace
-MONGO_FP_DECLARE(rsBgSyncProduce);
+MONGO_FP_DECLARE(stopReplProducer);
BackgroundSync* BackgroundSync::s_instance = 0;
stdx::mutex BackgroundSync::s_mutex;
@@ -133,6 +133,9 @@ static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &op
static Counter64 networkByteStats;
static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats);
+// Failpoint which causes rollback to hang before starting.
+MONGO_FP_DECLARE(rollbackHangBeforeStart);
+
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count",
@@ -262,6 +265,21 @@ void BackgroundSync::_producerThread() {
}
void BackgroundSync::_produce(OperationContext* txn) {
+ if (MONGO_FAIL_POINT(stopReplProducer)) {
+ // This log output is used in js tests so please leave it.
+ log() << "bgsync - stopReplProducer fail point "
+ "enabled. Blocking until fail point is disabled.";
+
+ // TODO(SERVER-27120): Remove the return statement and uncomment the while loop.
+ // Currently we cannot block here or we prevent primaries from being fully elected since
+ // we'll never call _signalNoNewDataForApplier.
+ // while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) {
+ // mongo::sleepsecs(1);
+ // }
+ mongo::sleepsecs(1);
+ return;
+ }
+
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
@@ -280,10 +298,6 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
- }
-
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
@@ -478,26 +492,6 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
bool syncSourceHasSyncSource = false;
OpTime sourcesLastOp;
- // Forward metadata (containing liveness information) to replication coordinator.
- bool receivedMetadata =
- queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
- if (receivedMetadata) {
- auto metadataResult =
- rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
- if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << source << ": "
- << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
- return;
- }
- const auto& metadata = metadataResult.getValue();
- _replCoord->processReplSetMetadata(metadata);
- if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
- _replCoord->cancelAndRescheduleElectionTimeout();
- }
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
- sourcesLastOp = metadata.getLastOpVisible();
- }
-
const auto& documents = queryResponse.documents;
auto firstDocToApply = documents.cbegin();
auto lastDocToApply = documents.cend();
@@ -576,6 +570,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return;
}
+ if (MONGO_FAIL_POINT(stopReplProducer)) {
+ return;
+ }
+
+ // Process replset metadata. It is important that this happen after we've validated the
+ // first batch, so we don't progress our knowledge of the commit point from a
+ // response that triggers a rollback.
+ bool receivedMetadata =
+ queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
+ if (receivedMetadata) {
+ auto metadataResult =
+ rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
+ if (!metadataResult.isOK()) {
+ error() << "invalid replication metadata from sync source " << source << ": "
+ << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
+ return;
+ }
+ const auto& metadata = metadataResult.getValue();
+ _replCoord->processReplSetMetadata(metadata, true /*advance commit point*/);
+ if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
+ _replCoord->cancelAndRescheduleElectionTimeout();
+ }
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
+ sourcesLastOp = metadata.getLastOpVisible();
+ }
+
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
Timestamp lastTS;
@@ -741,6 +761,15 @@ void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
+ if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
+ // This log output is used in js tests so please leave it.
+ log() << "rollback - rollbackHangBeforeStart fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+ }
+
// Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
// the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
// rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or