diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 79 |
1 files changed, 54 insertions, 25 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 37ba22bf1a7..75a91db536d 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -117,7 +117,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat } // namespace -MONGO_FP_DECLARE(rsBgSyncProduce); +MONGO_FP_DECLARE(stopReplProducer); BackgroundSync* BackgroundSync::s_instance = 0; stdx::mutex BackgroundSync::s_mutex; @@ -133,6 +133,9 @@ static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &op static Counter64 networkByteStats; static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats); +// Failpoint which causes rollback to hang before starting. +MONGO_FP_DECLARE(rollbackHangBeforeStart); + // The count of items in the buffer static Counter64 bufferCountGauge; static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count", @@ -262,6 +265,21 @@ void BackgroundSync::_producerThread() { } void BackgroundSync::_produce(OperationContext* txn) { + if (MONGO_FAIL_POINT(stopReplProducer)) { + // This log output is used in js tests so please leave it. + log() << "bgsync - stopReplProducer fail point " + "enabled. Blocking until fail point is disabled."; + + // TODO(SERVER-27120): Remove the return statement and uncomment the while loop. + // Currently we cannot block here or we prevent primaries from being fully elected since + // we'll never call _signalNoNewDataForApplier. + // while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) { + // mongo::sleepsecs(1); + // } + mongo::sleepsecs(1); + return; + } + // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced { @@ -280,10 +298,6 @@ void BackgroundSync::_produce(OperationContext* txn) { } } - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); - } - // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; @@ -478,26 +492,6 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& bool syncSourceHasSyncSource = false; OpTime sourcesLastOp; - // Forward metadata (containing liveness information) to replication coordinator. - bool receivedMetadata = - queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedMetadata) { - auto metadataResult = - rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); - if (!metadataResult.isOK()) { - error() << "invalid replication metadata from sync source " << source << ": " - << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; - return; - } - const auto& metadata = metadataResult.getValue(); - _replCoord->processReplSetMetadata(metadata); - if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { - _replCoord->cancelAndRescheduleElectionTimeout(); - } - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; - sourcesLastOp = metadata.getLastOpVisible(); - } - const auto& documents = queryResponse.documents; auto firstDocToApply = documents.cbegin(); auto lastDocToApply = documents.cend(); @@ -576,6 +570,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return; } + if (MONGO_FAIL_POINT(stopReplProducer)) { + return; + } + + // Process replset metadata. It is important that this happen after we've validated the + // first batch, so we don't progress our knowledge of the commit point from a + // response that triggers a rollback. + bool receivedMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedMetadata) { + auto metadataResult = + rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << source << ": " + << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; + return; + } + const auto& metadata = metadataResult.getValue(); + _replCoord->processReplSetMetadata(metadata, true /*advance commit point*/); + if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { + _replCoord->cancelAndRescheduleElectionTimeout(); + } + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; + sourcesLastOp = metadata.getLastOpVisible(); + } + // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; Timestamp lastTS; @@ -741,6 +761,15 @@ void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { + if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "rollback - rollbackHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + } + // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or |