summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog.cpp
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2017-09-27 14:26:59 -0400
committerEric Milkie <milkie@10gen.com>2017-10-05 16:05:11 -0400
commit7ef892d32c0507c57eda86ffc591ba5555eb78c6 (patch)
treede988ac727c32e930efae3fd618a9dc1241f742c /src/mongo/db/repl/oplog.cpp
parent08896eec457008f0f09e66bbbdc750ebb6dc6a43 (diff)
downloadmongo-7ef892d32c0507c57eda86ffc591ba5555eb78c6.tar.gz
SERVER-30638 change setReadFromMajorityCommittedSnapshot to use timestamps instead of named snapshots
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r--src/mongo/db/repl/oplog.cpp156
1 files changed, 0 insertions, 156 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b2928a96b8e..99d2e0c3929 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -79,7 +79,6 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/repl/snapshot_thread.h"
#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
@@ -113,8 +112,6 @@ using IndexVersion = IndexDescriptor::IndexVersion;
namespace repl {
std::string masterSlaveOplogName = "local.oplog.$main";
-MONGO_FP_DECLARE(disableSnapshotting);
-
namespace {
/**
* The `_localOplogCollection` pointer is always valid (or null) because an
@@ -1513,158 +1510,5 @@ void signalOplogWaiters() {
}
}
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000);
-
-SnapshotThread::SnapshotThread(SnapshotManager* manager)
- : _manager(manager), _thread([this] { run(); }) {}
-
-bool SnapshotThread::shouldSleepMore(int numSleepsDone, size_t numUncommittedSnapshots) {
- const double kThrottleRatio = 1 / 20.0;
- const size_t kUncommittedSnapshotLimit = 1000;
- const size_t kUncommittedSnapshotRestartPoint = kUncommittedSnapshotLimit / 2;
-
- if (_inShutdown.load())
- return false; // Exit the thread quickly without sleeping.
-
- if (numSleepsDone == 0)
- return true; // Always sleep at least once.
-
- {
- // Enforce a limit on the number of snapshots.
- if (numUncommittedSnapshots >= kUncommittedSnapshotLimit)
- _hitSnapshotLimit = true; // Don't create new snapshots.
-
- if (numUncommittedSnapshots < kUncommittedSnapshotRestartPoint)
- _hitSnapshotLimit = false; // Begin creating new snapshots again.
-
- if (_hitSnapshotLimit)
- return true;
- }
-
- // Spread out snapshots in time by sleeping as we collect more uncommitted snapshots.
- const double numSleepsNeeded = numUncommittedSnapshots * kThrottleRatio;
- return numSleepsNeeded > numSleepsDone;
-}
-
-void SnapshotThread::run() {
- Client::initThread("SnapshotThread");
- auto& client = cc();
- auto service = client.getServiceContext();
- auto replCoord = ReplicationCoordinator::get(service);
-
- Timestamp lastTimestamp(Timestamp::max()); // hack to trigger snapshot from startup.
- while (true) {
- // This block logically belongs at the end of the loop, but having it at the top
- // simplifies handling of the "continue" cases. It is harmless to do these before the
- // first run of the loop.
- for (int numSleepsDone = 0;
- shouldSleepMore(numSleepsDone, replCoord->getNumUncommittedSnapshots());
- numSleepsDone++) {
- sleepmicros(replSnapshotThreadThrottleMicros);
- _manager->cleanupUnneededSnapshots();
- }
-
- {
- stdx::unique_lock<stdx::mutex> lock(newOpMutex);
- while (true) {
- if (_inShutdown.load())
- return;
-
- if (_forcedSnapshotPending.load() || lastTimestamp != lastSetTimestamp) {
- _forcedSnapshotPending.store(false);
- lastTimestamp = lastSetTimestamp;
- break;
- }
-
- MONGO_IDLE_THREAD_BLOCK;
- newTimestampNotifier.wait(lock);
- }
- }
-
- while (MONGO_FAIL_POINT(disableSnapshotting)) {
- sleepsecs(1);
- if (_inShutdown.load()) {
- return;
- }
- }
-
- try {
- auto opCtx = client.makeOperationContext();
- Lock::GlobalLock globalLock(opCtx.get(), MODE_IS, UINT_MAX);
-
- if (!replCoord->getMemberState().readable()) {
- // If our MemberState isn't readable, we may not be in a consistent state so don't
- // take snapshots. When we transition into a readable state from a non-readable
- // state, a snapshot is forced to ensure we don't miss the latest write. This must
- // be checked each time we acquire the global IS lock since that prevents the node
- // from transitioning to a !readable() state from a readable() one in the cases
- // where we shouldn't be creating a snapshot.
- continue;
- }
-
- SnapshotName name(0); // assigned real value in block.
- {
- // Make sure there are no in-flight oplog inserts while we create our snapshot.
- // This lock cannot be aquired until all writes holding the resource commit/abort.
- Lock::ResourceLock cappedInsertLockForOplog(
- opCtx->lockState(), resourceInFlightForOplog, MODE_X);
-
- // Reserve the name immediately before we take our snapshot. This ensures that all
- // names that compare lower must be from points in time visible to this named
- // snapshot.
- name = replCoord->reserveSnapshotName(nullptr);
-
- // This establishes the view that we will name.
- _manager->prepareForCreateSnapshot(opCtx.get()).transitional_ignore();
- }
-
- auto opTimeOfSnapshot = OpTime();
- {
- AutoGetCollectionForReadCommand oplog(opCtx.get(),
- NamespaceString::kRsOplogNamespace);
- invariant(oplog.getCollection());
- // Read the latest op from the oplog.
- auto cursor = oplog.getCollection()->getCursor(opCtx.get(), /*forward*/ false);
- auto record = cursor->next();
- if (!record)
- continue; // oplog is completely empty.
-
- const auto op = record->data.releaseToBson();
- opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op));
- invariant(!opTimeOfSnapshot.isNull());
- }
-
- replCoord->createSnapshot(opCtx.get(), opTimeOfSnapshot, name);
- } catch (const WriteConflictException& wce) {
- log() << "skipping storage snapshot pass due to write conflict";
- continue;
- }
- }
-}
-
-void SnapshotThread::shutdown() {
- invariant(_thread.joinable());
- {
- stdx::lock_guard<stdx::mutex> lock(newOpMutex);
- invariant(!_inShutdown.load());
- _inShutdown.store(true);
- newTimestampNotifier.notify_all();
- }
- _thread.join();
-}
-
-void SnapshotThread::forceSnapshot() {
- stdx::lock_guard<stdx::mutex> lock(newOpMutex);
- _forcedSnapshotPending.store(true);
- newTimestampNotifier.notify_all();
-}
-
-std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) {
- if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) {
- return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager));
- }
- return {};
-}
-
} // namespace repl
} // namespace mongo