diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 10 |
2 files changed, 97 insertions, 4 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 671d56ed68d..98d86f0c7c0 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -52,11 +52,12 @@ #include "mongo/db/repl/rollback_source_impl.h" #include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/executor/network_interface_factory.h" -#include "mongo/db/repl/storage_interface.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -136,6 +137,9 @@ static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", // Failpoint which causes rollback to hang before starting. MONGO_FP_DECLARE(rollbackHangBeforeStart); +// Failpoint which causes the oplog fetcher to hang before the first fetch. +MONGO_FP_DECLARE(fetcherHangBeforeStart); + // The count of items in the buffer static Counter64 bufferCountGauge; static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count", @@ -159,6 +163,8 @@ size_t getSize(const BSONObj& o) { } } // namespace +const NamespaceString BackgroundSync::kLocalOplogNss("local.oplog.rs"); + BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), _threadPoolTaskExecutor(makeThreadPool(), @@ -298,12 +304,13 @@ void BackgroundSync::_produce(OperationContext* txn) { } } - + HostAndPort oldSyncSource; // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; { stdx::unique_lock<stdx::mutex> lock(_mutex); lastOpTimeFetched = _lastOpTimeFetched; + oldSyncSource = _syncSourceHost; _syncSourceHost = HostAndPort(); } OplogReader syncSourceReader; @@ -325,6 +332,17 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } + // If our sync source has not changed, it is likely caused by our heartbeat data map being + // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting + // for our map to update. + if (syncSourceReader.getHost() == oldSyncSource) { + log() << "Chose same sync source candidate as last time, " << oldSyncSource + << ". Sleeping for 1 second to avoid immediately choosing a new sync source for " + "the same reason as last time."; + + sleepsecs(1); + } + long long lastHashFetched; { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -386,6 +404,20 @@ void BackgroundSync::_produce(OperationContext* txn) { metadataBob.append(rpc::kReplSetMetadataFieldName, 1); } + if (MONGO_FAIL_POINT(fetcherHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "BackgroundSync - fetcherHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(fetcherHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + + // If the sync source candidate rolls back while in this fail point, it will close all + // connections and the next request will fail. + // We manually drop all connections here so that the following Fetcher request succeeds. + _threadPoolTaskExecutor.dropConnections(source); + } + auto dbName = nsToDatabase(rsOplogName); auto cmdObj = cmdBob.obj(); auto metadataObj = metadataBob.obj(); @@ -467,6 +499,39 @@ void BackgroundSync::_produce(OperationContext* txn) { } } +void BackgroundSync::_lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, + OpTime lastOpTimeFetched, + Status* returnStatus) { + if (!result.isOK()) { + *returnStatus = result.getStatus(); + return; + } + + const auto& queryResponse = result.getValue(); + if (queryResponse.documents.empty()) { + *returnStatus = Status(ErrorCodes::InvalidSyncSource, "Upstream node had an empty oplog."); + return; + } + + const auto& remoteLastAppliedDocument = queryResponse.documents.front(); + const auto remoteLastAppliedOpTime = OpTime::parseFromOplogEntry(remoteLastAppliedDocument); + if (!remoteLastAppliedOpTime.isOK()) { + *returnStatus = Status(ErrorCodes::InvalidBSON, + str::stream() << "Received invalid oplog entry from upstream node: " + << remoteLastAppliedDocument.toString() << ". Error: " + << remoteLastAppliedOpTime.getStatus().toString()); + return; + } + if (remoteLastAppliedOpTime.getValue() <= lastOpTimeFetched) { + *returnStatus = Status(ErrorCodes::InvalidSyncSource, + str::stream() << "Upstream node's last applied OpTime " + << remoteLastAppliedOpTime.getValue().toString() + << " is not greater than our last fetched OpTime " + << lastOpTimeFetched.toString()); + return; + } +} + void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, BSONObjBuilder* bob, const HostAndPort& source, @@ -528,13 +593,13 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& const auto rbidElem = rbidReplyObj["rbid"]; if (rbidElem.type() != NumberInt) { *returnStatus = - Status(ErrorCodes::BadValue, + Status(ErrorCodes::InvalidSyncSource, str::stream() << "Upstream node returned an " << "rbid with invalid type " << rbidElem.type()); return; } if (rbidElem.Int() != rbid) { - *returnStatus = Status(ErrorCodes::BadValue, + *returnStatus = Status(ErrorCodes::InvalidSyncSource, "Upstream node rolled back after verifying " "that it had our MinValid point. Retrying."); } @@ -548,6 +613,24 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& if (!returnStatus->isOK()) return; + // Check that the upstream last applied OpTime is newer than our last fetched OpTime. + Fetcher lastAppliedFetcher(&_threadPoolTaskExecutor, + source, + kLocalOplogNss.db().toString(), + BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" + << BSON("$natural" << -1)), + stdx::bind(&BackgroundSync::_lastAppliedFetcherCallback, + this, + stdx::placeholders::_1, + lastOpTimeFetched, + returnStatus), + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + Seconds(30)); + lastAppliedFetcher.schedule(); + lastAppliedFetcher.wait(); + if (!returnStatus->isOK()) + return; + auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> { if (firstDocToApply == lastDocToApply) { return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing"); diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 0c1a1ea99f5..20ea6629ba0 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -77,6 +77,8 @@ public: */ class BackgroundSync : public BackgroundSyncInterface { public: + static const NamespaceString kLocalOplogNss; + // Allow index prefetching to be turned on/off enum IndexPrefetchConfig { UNINITIALIZED = 0, @@ -191,6 +193,14 @@ private: int rbid); /** + * A callback to a Fetcher that checks that the remote last applied OpTime is newer than the + * local last fetched OpTime. + */ + void _lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, + OpTime lastOpTimeFetched, + Status* returnStatus); + + /** * Executes a rollback. * 'getConnection' returns a connection to the sync source. */ |