diff options
author | Eric Milkie <milkie@10gen.com> | 2017-09-27 14:26:59 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2017-10-05 16:05:11 -0400 |
commit | 7ef892d32c0507c57eda86ffc591ba5555eb78c6 (patch) | |
tree | de988ac727c32e930efae3fd618a9dc1241f742c /src/mongo/db/repl/oplog.cpp | |
parent | 08896eec457008f0f09e66bbbdc750ebb6dc6a43 (diff) | |
download | mongo-7ef892d32c0507c57eda86ffc591ba5555eb78c6.tar.gz |
SERVER-30638 change setReadFromMajorityCommittedSnapshot to use timestamps instead of named snapshots
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 156 |
1 files changed, 0 insertions, 156 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b2928a96b8e..99d2e0c3929 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -79,7 +79,6 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -113,8 +112,6 @@ using IndexVersion = IndexDescriptor::IndexVersion; namespace repl { std::string masterSlaveOplogName = "local.oplog.$main"; -MONGO_FP_DECLARE(disableSnapshotting); - namespace { /** * The `_localOplogCollection` pointer is always valid (or null) because an @@ -1513,158 +1510,5 @@ void signalOplogWaiters() { } } -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000); - -SnapshotThread::SnapshotThread(SnapshotManager* manager) - : _manager(manager), _thread([this] { run(); }) {} - -bool SnapshotThread::shouldSleepMore(int numSleepsDone, size_t numUncommittedSnapshots) { - const double kThrottleRatio = 1 / 20.0; - const size_t kUncommittedSnapshotLimit = 1000; - const size_t kUncommittedSnapshotRestartPoint = kUncommittedSnapshotLimit / 2; - - if (_inShutdown.load()) - return false; // Exit the thread quickly without sleeping. - - if (numSleepsDone == 0) - return true; // Always sleep at least once. - - { - // Enforce a limit on the number of snapshots. - if (numUncommittedSnapshots >= kUncommittedSnapshotLimit) - _hitSnapshotLimit = true; // Don't create new snapshots. - - if (numUncommittedSnapshots < kUncommittedSnapshotRestartPoint) - _hitSnapshotLimit = false; // Begin creating new snapshots again. - - if (_hitSnapshotLimit) - return true; - } - - // Spread out snapshots in time by sleeping as we collect more uncommitted snapshots. - const double numSleepsNeeded = numUncommittedSnapshots * kThrottleRatio; - return numSleepsNeeded > numSleepsDone; -} - -void SnapshotThread::run() { - Client::initThread("SnapshotThread"); - auto& client = cc(); - auto service = client.getServiceContext(); - auto replCoord = ReplicationCoordinator::get(service); - - Timestamp lastTimestamp(Timestamp::max()); // hack to trigger snapshot from startup. - while (true) { - // This block logically belongs at the end of the loop, but having it at the top - // simplifies handling of the "continue" cases. It is harmless to do these before the - // first run of the loop. - for (int numSleepsDone = 0; - shouldSleepMore(numSleepsDone, replCoord->getNumUncommittedSnapshots()); - numSleepsDone++) { - sleepmicros(replSnapshotThreadThrottleMicros); - _manager->cleanupUnneededSnapshots(); - } - - { - stdx::unique_lock<stdx::mutex> lock(newOpMutex); - while (true) { - if (_inShutdown.load()) - return; - - if (_forcedSnapshotPending.load() || lastTimestamp != lastSetTimestamp) { - _forcedSnapshotPending.store(false); - lastTimestamp = lastSetTimestamp; - break; - } - - MONGO_IDLE_THREAD_BLOCK; - newTimestampNotifier.wait(lock); - } - } - - while (MONGO_FAIL_POINT(disableSnapshotting)) { - sleepsecs(1); - if (_inShutdown.load()) { - return; - } - } - - try { - auto opCtx = client.makeOperationContext(); - Lock::GlobalLock globalLock(opCtx.get(), MODE_IS, UINT_MAX); - - if (!replCoord->getMemberState().readable()) { - // If our MemberState isn't readable, we may not be in a consistent state so don't - // take snapshots. When we transition into a readable state from a non-readable - // state, a snapshot is forced to ensure we don't miss the latest write. This must - // be checked each time we acquire the global IS lock since that prevents the node - // from transitioning to a !readable() state from a readable() one in the cases - // where we shouldn't be creating a snapshot. - continue; - } - - SnapshotName name(0); // assigned real value in block. - { - // Make sure there are no in-flight oplog inserts while we create our snapshot. - // This lock cannot be aquired until all writes holding the resource commit/abort. - Lock::ResourceLock cappedInsertLockForOplog( - opCtx->lockState(), resourceInFlightForOplog, MODE_X); - - // Reserve the name immediately before we take our snapshot. This ensures that all - // names that compare lower must be from points in time visible to this named - // snapshot. - name = replCoord->reserveSnapshotName(nullptr); - - // This establishes the view that we will name. - _manager->prepareForCreateSnapshot(opCtx.get()).transitional_ignore(); - } - - auto opTimeOfSnapshot = OpTime(); - { - AutoGetCollectionForReadCommand oplog(opCtx.get(), - NamespaceString::kRsOplogNamespace); - invariant(oplog.getCollection()); - // Read the latest op from the oplog. - auto cursor = oplog.getCollection()->getCursor(opCtx.get(), /*forward*/ false); - auto record = cursor->next(); - if (!record) - continue; // oplog is completely empty. - - const auto op = record->data.releaseToBson(); - opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op)); - invariant(!opTimeOfSnapshot.isNull()); - } - - replCoord->createSnapshot(opCtx.get(), opTimeOfSnapshot, name); - } catch (const WriteConflictException& wce) { - log() << "skipping storage snapshot pass due to write conflict"; - continue; - } - } -} - -void SnapshotThread::shutdown() { - invariant(_thread.joinable()); - { - stdx::lock_guard<stdx::mutex> lock(newOpMutex); - invariant(!_inShutdown.load()); - _inShutdown.store(true); - newTimestampNotifier.notify_all(); - } - _thread.join(); -} - -void SnapshotThread::forceSnapshot() { - stdx::lock_guard<stdx::mutex> lock(newOpMutex); - _forcedSnapshotPending.store(true); - newTimestampNotifier.notify_all(); -} - -std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) { - if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) { - return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager)); - } - return {}; -} - } // namespace repl } // namespace mongo |