diff options
author | Eric Milkie <milkie@10gen.com> | 2017-09-27 14:26:59 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2017-10-05 16:05:11 -0400 |
commit | 7ef892d32c0507c57eda86ffc591ba5555eb78c6 (patch) | |
tree | de988ac727c32e930efae3fd618a9dc1241f742c /src | |
parent | 08896eec457008f0f09e66bbbdc750ebb6dc6a43 (diff) | |
download | mongo-7ef892d32c0507c57eda86ffc591ba5555eb78c6.tar.gz |
SERVER-30638 change setReadFromMajorityCommittedSnapshot to use timestamps instead of named snapshots
Diffstat (limited to 'src')
26 files changed, 138 insertions, 637 deletions
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index e23acfb42aa..f4fe399674c 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -116,7 +116,6 @@ public: // Ban reading from this collection on committed reads on snapshots before now. auto replCoord = repl::ReplicationCoordinator::get(_opCtx); auto snapshotName = replCoord->reserveSnapshotName(_opCtx); - replCoord->forceSnapshotCreation(); // Ensures a newer snapshot gets created even if idle. it->second->setMinimumVisibleSnapshot(snapshotName); } diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 726dc2f5e28..7636cd6af6a 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -459,7 +459,6 @@ void IndexCatalogImpl::IndexBuildBlock::success() { // and no one can try to read this index before we set the visibility. auto replCoord = repl::ReplicationCoordinator::get(opCtx); auto snapshotName = replCoord->reserveSnapshotName(opCtx); - replCoord->forceSnapshotCreation(); // Ensures a newer snapshot gets created even if idle. entry->setMinimumVisibleSnapshot(snapshotName); // TODO remove this once SERVER-20439 is implemented. It is a stopgap solution for @@ -956,7 +955,6 @@ public: // Ban reading from this collection on committed reads on snapshots before now. auto replCoord = repl::ReplicationCoordinator::get(_opCtx); auto snapshotName = replCoord->reserveSnapshotName(_opCtx); - replCoord->forceSnapshotCreation(); // Ensures a newer snapshot gets created even if idle. _collection->setMinimumVisibleSnapshot(snapshotName); delete _entry; diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes.cpp index 1faedbb761c..cc234d24d80 100644 --- a/src/mongo/db/commands/drop_indexes.cpp +++ b/src/mongo/db/commands/drop_indexes.cpp @@ -218,7 +218,6 @@ public: // snapshot so are unable to be used. auto replCoord = repl::ReplicationCoordinator::get(opCtx); auto snapshotName = replCoord->reserveSnapshotName(opCtx); - replCoord->forceSnapshotCreation(); // Ensures a newer snapshot gets created even if idle. collection->setMinimumVisibleSnapshot(snapshotName); result.append("nIndexes", static_cast<int>(indexInfoObjs.getValue().size())); diff --git a/src/mongo/db/commands/snapshot_management.cpp b/src/mongo/db/commands/snapshot_management.cpp index 4f797cf0700..324871e01c2 100644 --- a/src/mongo/db/commands/snapshot_management.cpp +++ b/src/mongo/db/commands/snapshot_management.cpp @@ -78,10 +78,8 @@ public: auto status = snapshotManager->prepareForCreateSnapshot(opCtx); if (status.isOK()) { - const auto name = - repl::ReplicationCoordinator::get(opCtx)->reserveSnapshotName(nullptr); + const auto name = repl::ReplicationCoordinator::get(opCtx)->reserveSnapshotName(opCtx); result.append("name", static_cast<long long>(name.asU64())); - status = snapshotManager->createSnapshot(opCtx, name); } return appendCommandStatus(result, status); } diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp index 9b034fcb566..79d353f536e 100644 --- a/src/mongo/db/read_concern.cpp +++ b/src/mongo/db/read_concern.cpp @@ -54,14 +54,6 @@ namespace mongo { namespace { -// This is a special flag that allows for testing of snapshot behavior by skipping the replication -// related checks and isolating the storage/query side of snapshotting. -bool testingSnapshotBehaviorInIsolation = false; -ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation( - ServerParameterSet::getGlobal(), - "testingSnapshotBehaviorInIsolation", - &testingSnapshotBehaviorInIsolation); - /** * Schedule a write via appendOplogNote command to the primary of this replica set. */ @@ -187,8 +179,7 @@ Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs& 39345, opCtx->recoveryUnit()->selectSnapshot(SnapshotName(pointInTime->asTimestamp()))); } - // Skip waiting for the OpTime when testing snapshot behavior - if (!testingSnapshotBehaviorInIsolation && !readConcernArgs.isEmpty()) { + if (!readConcernArgs.isEmpty()) { if (replCoord->isReplEnabled() && afterClusterTime) { auto status = makeNoopWriteIfNeeded(opCtx, *afterClusterTime); if (!status.isOK()) { @@ -204,11 +195,10 @@ Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs& } } - if ((replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet || - testingSnapshotBehaviorInIsolation) && - readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern && + replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet) { // ReadConcern Majority is not supported in ProtocolVersion 0. - if (!testingSnapshotBehaviorInIsolation && !replCoord->isV1ElectionProtocol()) { + if (!replCoord->isV1ElectionProtocol()) { return {ErrorCodes::ReadConcernMajorityNotEnabled, str::stream() << "Replica sets running protocol version 0 do not support " "readConcern: majority"}; diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index b8feba841c1..0c9d012070b 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -276,7 +276,6 @@ Status repairDatabase(OperationContext* opCtx, // versions are in the committed view. auto replCoord = repl::ReplicationCoordinator::get(opCtx); auto snapshotName = replCoord->reserveSnapshotName(opCtx); - replCoord->forceSnapshotCreation(); // Ensure a newer snapshot is created even if idle. for (auto&& collection : *db) { collection->setMinimumVisibleSnapshot(snapshotName); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b2928a96b8e..99d2e0c3929 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -79,7 +79,6 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -113,8 +112,6 @@ using IndexVersion = IndexDescriptor::IndexVersion; namespace repl { std::string masterSlaveOplogName = "local.oplog.$main"; -MONGO_FP_DECLARE(disableSnapshotting); - namespace { /** * The `_localOplogCollection` pointer is always valid (or null) because an @@ -1513,158 +1510,5 @@ void signalOplogWaiters() { } } -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(replSnapshotThreadThrottleMicros, int, 1000); - -SnapshotThread::SnapshotThread(SnapshotManager* manager) - : _manager(manager), _thread([this] { run(); }) {} - -bool SnapshotThread::shouldSleepMore(int numSleepsDone, size_t numUncommittedSnapshots) { - const double kThrottleRatio = 1 / 20.0; - const size_t kUncommittedSnapshotLimit = 1000; - const size_t kUncommittedSnapshotRestartPoint = kUncommittedSnapshotLimit / 2; - - if (_inShutdown.load()) - return false; // Exit the thread quickly without sleeping. - - if (numSleepsDone == 0) - return true; // Always sleep at least once. - - { - // Enforce a limit on the number of snapshots. - if (numUncommittedSnapshots >= kUncommittedSnapshotLimit) - _hitSnapshotLimit = true; // Don't create new snapshots. - - if (numUncommittedSnapshots < kUncommittedSnapshotRestartPoint) - _hitSnapshotLimit = false; // Begin creating new snapshots again. - - if (_hitSnapshotLimit) - return true; - } - - // Spread out snapshots in time by sleeping as we collect more uncommitted snapshots. - const double numSleepsNeeded = numUncommittedSnapshots * kThrottleRatio; - return numSleepsNeeded > numSleepsDone; -} - -void SnapshotThread::run() { - Client::initThread("SnapshotThread"); - auto& client = cc(); - auto service = client.getServiceContext(); - auto replCoord = ReplicationCoordinator::get(service); - - Timestamp lastTimestamp(Timestamp::max()); // hack to trigger snapshot from startup. - while (true) { - // This block logically belongs at the end of the loop, but having it at the top - // simplifies handling of the "continue" cases. It is harmless to do these before the - // first run of the loop. - for (int numSleepsDone = 0; - shouldSleepMore(numSleepsDone, replCoord->getNumUncommittedSnapshots()); - numSleepsDone++) { - sleepmicros(replSnapshotThreadThrottleMicros); - _manager->cleanupUnneededSnapshots(); - } - - { - stdx::unique_lock<stdx::mutex> lock(newOpMutex); - while (true) { - if (_inShutdown.load()) - return; - - if (_forcedSnapshotPending.load() || lastTimestamp != lastSetTimestamp) { - _forcedSnapshotPending.store(false); - lastTimestamp = lastSetTimestamp; - break; - } - - MONGO_IDLE_THREAD_BLOCK; - newTimestampNotifier.wait(lock); - } - } - - while (MONGO_FAIL_POINT(disableSnapshotting)) { - sleepsecs(1); - if (_inShutdown.load()) { - return; - } - } - - try { - auto opCtx = client.makeOperationContext(); - Lock::GlobalLock globalLock(opCtx.get(), MODE_IS, UINT_MAX); - - if (!replCoord->getMemberState().readable()) { - // If our MemberState isn't readable, we may not be in a consistent state so don't - // take snapshots. When we transition into a readable state from a non-readable - // state, a snapshot is forced to ensure we don't miss the latest write. This must - // be checked each time we acquire the global IS lock since that prevents the node - // from transitioning to a !readable() state from a readable() one in the cases - // where we shouldn't be creating a snapshot. - continue; - } - - SnapshotName name(0); // assigned real value in block. - { - // Make sure there are no in-flight oplog inserts while we create our snapshot. - // This lock cannot be aquired until all writes holding the resource commit/abort. - Lock::ResourceLock cappedInsertLockForOplog( - opCtx->lockState(), resourceInFlightForOplog, MODE_X); - - // Reserve the name immediately before we take our snapshot. This ensures that all - // names that compare lower must be from points in time visible to this named - // snapshot. - name = replCoord->reserveSnapshotName(nullptr); - - // This establishes the view that we will name. - _manager->prepareForCreateSnapshot(opCtx.get()).transitional_ignore(); - } - - auto opTimeOfSnapshot = OpTime(); - { - AutoGetCollectionForReadCommand oplog(opCtx.get(), - NamespaceString::kRsOplogNamespace); - invariant(oplog.getCollection()); - // Read the latest op from the oplog. - auto cursor = oplog.getCollection()->getCursor(opCtx.get(), /*forward*/ false); - auto record = cursor->next(); - if (!record) - continue; // oplog is completely empty. - - const auto op = record->data.releaseToBson(); - opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromOplogEntry(op)); - invariant(!opTimeOfSnapshot.isNull()); - } - - replCoord->createSnapshot(opCtx.get(), opTimeOfSnapshot, name); - } catch (const WriteConflictException& wce) { - log() << "skipping storage snapshot pass due to write conflict"; - continue; - } - } -} - -void SnapshotThread::shutdown() { - invariant(_thread.joinable()); - { - stdx::lock_guard<stdx::mutex> lock(newOpMutex); - invariant(!_inShutdown.load()); - _inShutdown.store(true); - newTimestampNotifier.notify_all(); - } - _thread.join(); -} - -void SnapshotThread::forceSnapshot() { - stdx::lock_guard<stdx::mutex> lock(newOpMutex); - _forcedSnapshotPending.store(true); - newTimestampNotifier.notify_all(); -} - -std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) { - if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) { - return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager)); - } - return {}; -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index c9d478a4ec7..e02a4efffae 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -807,14 +807,6 @@ public: virtual SnapshotName reserveSnapshotName(OperationContext* opCtx) = 0; /** - * Signals the SnapshotThread, if running, to take a forced snapshot even if the global - * timestamp hasn't changed. - * - * Does not wait for the snapshot to be taken. - */ - virtual void forceSnapshotCreation() = 0; - - /** * Creates a new snapshot in the storage engine and registers it for use in the replication * coordinator. */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index c8550da8027..5503e5262a3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -289,19 +289,6 @@ public: virtual void updateCommittedSnapshot(SnapshotInfo newCommitPoint) = 0; /** - * Creates a new snapshot. - */ - virtual void createSnapshot(OperationContext* opCtx, SnapshotName name) = 0; - - /** - * Signals the SnapshotThread, if running, to take a forced snapshot even if the global - * timestamp hasn't changed. - * - * Does not wait for the snapshot to be taken. - */ - virtual void forceSnapshotCreation() = 0; - - /** * Returns whether or not the SnapshotThread is active. */ virtual bool snapshotsEnabled() const = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 2e99b48c2e8..f1085c548f6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -67,7 +67,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/rs_sync.h" -#include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/s/balancer/balancer.h" #include "mongo/db/s/sharding_state.h" @@ -122,11 +121,6 @@ const char tsFieldName[] = "ts"; const char kCollectionOplogBufferName[] = "collection"; const char kBlockingQueueOplogBufferName[] = "inMemoryBlockingQueue"; -// Set this to true to force background creation of snapshots even if --enableMajorityReadConcern -// isn't specified. This can be used for A-B benchmarking to find how much overhead -// repl::SnapshotThread introduces. -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(enableReplSnapshotThread, bool, false); - // Set this to specify whether to use a collection to buffer the oplog on the destination server // during initial sync to prevent rolling over the oplog. MONGO_EXPORT_STARTUP_SERVER_PARAMETER(initialSyncOplogBuffer, @@ -293,11 +287,6 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s return; } - if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) { - log() << "Starting replication snapshot thread"; - _snapshotThread = SnapshotThread::start(_service); - } - log() << "Starting replication storage threads"; _service->getGlobalStorageEngine()->setJournalListener(this); @@ -325,11 +314,6 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) _stopDataReplication_inlock(opCtx, &lk); - if (_snapshotThread) { - log() << "Stopping replication snapshot thread"; - _snapshotThread->shutdown(); - } - if (_noopWriter) { LOG(1) << "Stopping noop writer"; _noopWriter->stopWritingPeriodicNoops(); @@ -802,25 +786,15 @@ void ReplicationCoordinatorExternalStateImpl::dropAllSnapshots() { void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotInfo newCommitPoint) { auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); - invariant(manager); // This should never be called if there is no SnapshotManager. - manager->setCommittedSnapshot(newCommitPoint.name, newCommitPoint.opTime.getTimestamp()); + if (manager) { + manager->setCommittedSnapshot(SnapshotName(newCommitPoint.opTime.getTimestamp()), + newCommitPoint.opTime.getTimestamp()); + } notifyOplogMetadataWaiters(newCommitPoint.opTime); } -void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx, - SnapshotName name) { - auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); - invariant(manager); // This should never be called if there is no SnapshotManager. - manager->createSnapshot(opCtx, name).transitional_ignore(); -} - -void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() { - if (_snapshotThread) - _snapshotThread->forceSnapshot(); -} - bool ReplicationCoordinatorExternalStateImpl::snapshotsEnabled() const { - return _snapshotThread != nullptr; + return _service->getGlobalStorageEngine()->getSnapshotManager() != nullptr; } void ReplicationCoordinatorExternalStateImpl::notifyOplogMetadataWaiters( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index a570ac111c5..145803adf64 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -53,7 +53,6 @@ using UniqueLock = stdx::unique_lock<stdx::mutex>; class DropPendingCollectionReaper; class ReplicationProcess; -class SnapshotThread; class StorageInterface; class NoopWriter; @@ -102,8 +101,6 @@ public: virtual void startProducerIfStopped(); void dropAllSnapshots() final; void updateCommittedSnapshot(SnapshotInfo newCommitPoint) final; - void createSnapshot(OperationContext* opCtx, SnapshotName name) final; - void forceSnapshotCreation() final; virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime); virtual double getElectionTimeoutOffsetLimitFraction() const; @@ -194,8 +191,6 @@ private: // Number used to uniquely name threads. long long _nextThreadId = 0; - std::unique_ptr<SnapshotThread> _snapshotThread; - // Task executor used to run replication tasks. std::unique_ptr<executor::TaskExecutor> _taskExecutor; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 4bdfee93c5b..9af8aec2341 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -221,11 +221,6 @@ void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {} void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(SnapshotInfo newCommitPoint) { } -void ReplicationCoordinatorExternalStateMock::createSnapshot(OperationContext* opCtx, - SnapshotName name) {} - -void ReplicationCoordinatorExternalStateMock::forceSnapshotCreation() {} - bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const { return _areSnapshotsEnabled; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 81eb5d634c3..a05186710e3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -89,8 +89,6 @@ public: virtual void startProducerIfStopped(); virtual void dropAllSnapshots(); virtual void updateCommittedSnapshot(SnapshotInfo newCommitPoint); - virtual void createSnapshot(OperationContext* opCtx, SnapshotName name); - virtual void forceSnapshotCreation(); virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(const OpTime& committedOpTime); virtual double getElectionTimeoutOffsetLimitFraction() const; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 4bf51d9c9e8..c342cff5562 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -178,6 +178,15 @@ BSONObj incrementConfigVersionByRandom(BSONObj config) { return builder.obj(); } +// This is a special flag that allows for testing of snapshot behavior by skipping the replication +// related checks and isolating the storage/query side of snapshotting. +// SERVER-31304 rename this parameter to something more appropriate. +bool testingSnapshotBehaviorInIsolation = false; +ExportedServerParameter<bool, ServerParameterType::kStartupOnly> TestingSnapshotBehaviorInIsolation( + ServerParameterSet::getGlobal(), + "testingSnapshotBehaviorInIsolation", + &testingSnapshotBehaviorInIsolation); + } // namespace ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) @@ -1056,6 +1065,7 @@ void ReplicationCoordinatorImpl::_resetMyLastOpTimes_inlock() { // Reset to uninitialized OpTime _setMyLastAppliedOpTime_inlock(OpTime(), true); _setMyLastDurableOpTime_inlock(OpTime(), true); + _stableTimestampCandidates.clear(); } void ReplicationCoordinatorImpl::_reportUpstream_inlock(stdx::unique_lock<stdx::mutex> lock) { @@ -1089,7 +1099,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op // set of stable timestamp candidates can only get cleaned up when the commit point advances, we // should refrain from updating stable timestamp candidates in master-slave mode, to avoid the // candidates list from growing unbounded. - if (getReplicationMode() == Mode::modeReplSet) { + if (!opTime.isNull() && getReplicationMode() == Mode::modeReplSet) { _stableTimestampCandidates.insert(opTime.getTimestamp()); _setStableTimestampForStorage_inlock(); } @@ -1451,15 +1461,16 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( StringData patternName; if (writeConcern.wMode == WriteConcernOptions::kMajority) { - if (_externalState->snapshotsEnabled()) { + if (_externalState->snapshotsEnabled() && !testingSnapshotBehaviorInIsolation) { // Make sure we have a valid "committed" snapshot up to the needed optime. if (!_currentCommittedSnapshot) { return false; } // Wait for the "current" snapshot to advance to/past the opTime. - const auto haveSnapshot = (_currentCommittedSnapshot->opTime >= opTime && - _currentCommittedSnapshot->name >= minSnapshot); + const auto haveSnapshot = + (_currentCommittedSnapshot->opTime >= opTime && + _currentCommittedSnapshot->opTime.getTimestamp().asULL() >= minSnapshot.asU64()); if (!haveSnapshot) { LOG(1) << "Required snapshot optime: " << opTime << " is not yet part of the " << "current 'committed' snapshot: " << *_currentCommittedSnapshot; @@ -2554,13 +2565,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock( } } - if (newState.readable() && !_memberState.readable()) { - // When we transition to a readable state from a non-readable one, force the SnapshotThread - // to take a snapshot, if it is running. This is because it never takes snapshots when not - // in readable states. - _externalState->forceSnapshotCreation(); - } - if (newState.rollback()) { // When we start rollback, we need to drop all snapshots since we may need to create // out-of-order snapshots. This would be necessary even if the SnapshotName was completely @@ -2586,7 +2590,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock( if (_memberState.rollback()) { // Ensure that no snapshots were created while we were in rollback. invariant(!_currentCommittedSnapshot); - invariant(_uncommittedSnapshots.empty()); } // If we are transitioning from secondary, cancel any scheduled takeovers. @@ -3118,7 +3121,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource( void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { if (_topCoord->updateLastCommittedOpTime()) { - _updateCommitPoint_inlock(); + _setStableTimestampForStorage_inlock(); } // Wake up any threads waiting for replication that now have their replication // check satisfied. We must do this regardless of whether we updated the lastCommittedOpTime, @@ -3190,8 +3193,13 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() { if (stableTimestamp) { LOG(2) << "Setting replication's stable timestamp to " << stableTimestamp.value(); - _storage->setStableTimestamp(getServiceContext(), SnapshotName(stableTimestamp.get())); - + if (!testingSnapshotBehaviorInIsolation) { + // Update committed snapshot and wake up any threads waiting on read concern or + // write concern. + _updateCommittedSnapshot_inlock(SnapshotInfo{ + OpTime(stableTimestamp.get(), _topCoord->getTerm()), SnapshotName::min()}); + _storage->setStableTimestamp(getServiceContext(), SnapshotName(stableTimestamp.get())); + } _cleanupStableTimestampCandidates(&_stableTimestampCandidates, stableTimestamp.get()); } } @@ -3207,38 +3215,19 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& commit _setMyLastAppliedOpTime_inlock(committedOpTime, false); } - _updateCommitPoint_inlock(); + _setStableTimestampForStorage_inlock(); + // Even if we have no new snapshot, we need to notify waiters that the commit point moved. + _externalState->notifyOplogMetadataWaiters(committedOpTime); } } void ReplicationCoordinatorImpl::_updateCommitPoint_inlock() { - auto committedOpTime = _topCoord->getLastCommittedOpTime(); - - // Update the stable timestamp. + // Update the stable timestamp _setStableTimestampForStorage_inlock(); - auto maxSnapshotForOpTime = SnapshotInfo{committedOpTime, SnapshotName::max()}; - - if (!_uncommittedSnapshots.empty() && _uncommittedSnapshots.front() <= maxSnapshotForOpTime) { - // At least one uncommitted snapshot is ready to be blessed as committed. - - // Seek to the first entry > the commit point. Previous element must be <=. - const auto onePastCommitPoint = std::upper_bound( - _uncommittedSnapshots.begin(), _uncommittedSnapshots.end(), maxSnapshotForOpTime); - const auto newSnapshot = *std::prev(onePastCommitPoint); - - // Forget about all snapshots <= the new commit point. - _uncommittedSnapshots.erase(_uncommittedSnapshots.begin(), onePastCommitPoint); - _uncommittedSnapshotsSize.store(_uncommittedSnapshots.size()); - - // Update committed snapshot and wake up any threads waiting on read concern or - // write concern. - _updateCommittedSnapshot_inlock(newSnapshot); - } else { - // Even if we have no new snapshot, we need to notify waiters that the commit point - // moved. - _externalState->notifyOplogMetadataWaiters(committedOpTime); - } + auto committedOpTime = _topCoord->getLastCommittedOpTime(); + // Notify waiters that the commit point moved. + _externalState->notifyOplogMetadataWaiters(committedOpTime); } OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { @@ -3471,24 +3460,32 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock( } SnapshotName ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* opCtx) { - auto reservedName = SnapshotName(_snapshotNameGenerator.addAndFetch(1)); - dassert(reservedName > SnapshotName::min()); - dassert(reservedName < SnapshotName::max()); - if (opCtx) { - ReplClientInfo::forClient(opCtx->getClient()).setLastSnapshot(reservedName); + SnapshotName reservedName; + if (getReplicationMode() == Mode::modeReplSet) { + invariant(opCtx->lockState()->isLocked()); + if (getMemberState().primary()) { + // Use the current optime on the node, for primary nodes. + reservedName = SnapshotName( + LogicalClock::get(getServiceContext())->getClusterTime().asTimestamp()); + } else { + // Use lastApplied time, for secondary nodes. + reservedName = SnapshotName(getMyLastAppliedOpTime().getTimestamp()); + } + } else { + // All snapshots are the same for a standalone node. + reservedName = SnapshotName(0); } + // This was just in case the snapshot name was different from the lastOp in the client. + ReplClientInfo::forClient(opCtx->getClient()).setLastSnapshot(reservedName); return reservedName; } -void ReplicationCoordinatorImpl::forceSnapshotCreation() { - _externalState->forceSnapshotCreation(); -} - void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx, const SnapshotName& untilSnapshot) { stdx::unique_lock<stdx::mutex> lock(_mutex); - while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) { + while (!_currentCommittedSnapshot || + _currentCommittedSnapshot->opTime.getTimestamp().asULL() < untilSnapshot.asU64()) { opCtx->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock); } } @@ -3500,40 +3497,32 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { void ReplicationCoordinatorImpl::createSnapshot(OperationContext* opCtx, OpTime timeOfSnapshot, SnapshotName name) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _externalState->createSnapshot(opCtx, name); - auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name}; + // SERVER-31304: Delete this function. + return; +} - if (timeOfSnapshot <= _topCoord->getLastCommittedOpTime()) { - // This snapshot is ready to be marked as committed. - invariant(_uncommittedSnapshots.empty()); - _updateCommittedSnapshot_inlock(snapshotInfo); +MONGO_FP_DECLARE(disableSnapshotting); + +void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( + SnapshotInfo newCommittedSnapshot) { + if (testingSnapshotBehaviorInIsolation) { return; } - if (!_uncommittedSnapshots.empty()) { - invariant(snapshotInfo > _uncommittedSnapshots.back()); - // The name must independently be newer. - invariant(snapshotInfo.name > _uncommittedSnapshots.back().name); - // Technically, we could delete older snapshots from the same optime since we will only ever - // want the newest. However, multiple snapshots on the same optime will be very rare so it - // isn't worth the effort and potential bugs that would introduce. + // If we are in ROLLBACK state, do not set any new _currentCommittedSnapshot, as it will be + // cleared at the end of rollback anyway. + if (_memberState.rollback()) { + log() << "not updating committed snapshot because we are in rollback"; + return; } - _uncommittedSnapshots.push_back(snapshotInfo); - _uncommittedSnapshotsSize.store(_uncommittedSnapshots.size()); -} - -void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( - SnapshotInfo newCommittedSnapshot) { invariant(!newCommittedSnapshot.opTime.isNull()); - invariant(newCommittedSnapshot.opTime <= _topCoord->getLastCommittedOpTime()); + invariant(newCommittedSnapshot.opTime.getTimestamp() <= + _topCoord->getLastCommittedOpTime().getTimestamp()); if (_currentCommittedSnapshot) { invariant(newCommittedSnapshot.opTime >= _currentCommittedSnapshot->opTime); - invariant(newCommittedSnapshot.name > _currentCommittedSnapshot->name); } - if (!_uncommittedSnapshots.empty()) - invariant(newCommittedSnapshot < _uncommittedSnapshots.front()); - + if (MONGO_FAIL_POINT(disableSnapshotting)) + return; _currentCommittedSnapshot = newCommittedSnapshot; _currentCommittedSnapshotCond.notify_all(); @@ -3549,8 +3538,6 @@ void ReplicationCoordinatorImpl::dropAllSnapshots() { } void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() { - _uncommittedSnapshots.clear(); - _uncommittedSnapshotsSize.store(_uncommittedSnapshots.size()); _currentCommittedSnapshot = boost::none; _externalState->dropAllSnapshots(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 7d3b8e7c7af..7865f3a7750 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -303,8 +303,6 @@ public: virtual SnapshotName reserveSnapshotName(OperationContext* opCtx) override; - virtual void forceSnapshotCreation() override; - virtual void createSnapshot(OperationContext* opCtx, OpTime timeOfSnapshot, SnapshotName name) override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index ce55579abd9..0ac9df2b61a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -945,7 +945,6 @@ TEST_F( OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); - // Set up valid write concerns for the rest of the test WriteConcernOptions majorityWriteConcern; majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -977,7 +976,6 @@ TEST_F( getReplCoord()->setLastDurableOptime_forTest(2, 1, time1).transitional_ignore(); getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1).transitional_ignore(); getReplCoord()->setLastDurableOptime_forTest(2, 2, time1).transitional_ignore(); - getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, majorityWriteConcern); ASSERT_OK(statusAndDur.status); @@ -997,39 +995,6 @@ TEST_F( statusAndDur = getReplCoord()->awaitReplication(opCtx.get(), time1, multiRackWriteConcern); ASSERT_OK(statusAndDur.status); - // Majority also waits for the committed snapshot to be newer than all snapshots reserved by - // this operation. Custom modes not affected by this. - while (getReplCoord()->reserveSnapshotName(opCtx.get()) <= SnapshotName(1)) { - // These unittests "cheat" and use SnapshotName(1) without advancing the counter. Reserve - // another name if we didn't get a high enough one. - } - - auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1); - ReplClientInfo::forClient(opCtx.get()->getClient()).setLastOp(zeroOpTimeInCurrentTerm); - statusAndDur = - getReplCoord()->awaitReplicationOfLastOpForClient(opCtx.get(), majorityWriteConcern); - ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); - statusAndDur = - getReplCoord()->awaitReplicationOfLastOpForClient(opCtx.get(), multiDCWriteConcern); - ASSERT_OK(statusAndDur.status); - statusAndDur = - getReplCoord()->awaitReplicationOfLastOpForClient(opCtx.get(), multiRackWriteConcern); - ASSERT_OK(statusAndDur.status); - - // All modes satisfied - getReplCoord()->createSnapshot( - opCtx.get(), time1, getReplCoord()->reserveSnapshotName(nullptr)); - - statusAndDur = - getReplCoord()->awaitReplicationOfLastOpForClient(opCtx.get(), majorityWriteConcern); - ASSERT_OK(statusAndDur.status); - statusAndDur = - getReplCoord()->awaitReplicationOfLastOpForClient(opCtx.get(), multiDCWriteConcern); - ASSERT_OK(statusAndDur.status); - statusAndDur = - getReplCoord()->awaitReplicationOfLastOpForClient(opCtx.get(), multiRackWriteConcern); - ASSERT_OK(statusAndDur.status); - // multiDC satisfied but not majority or multiRack getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); @@ -2898,12 +2863,11 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { time_t lastWriteDate = 101; OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1); - time_t majorityWriteDate = 100; - OpTime majorityOpTime = OpTime(Timestamp(majorityWriteDate, 1), 1); + time_t majorityWriteDate = lastWriteDate; + OpTime majorityOpTime = opTime; getReplCoord()->setMyLastAppliedOpTime(opTime); getReplCoord()->setMyLastDurableOpTime(opTime); - getReplCoord()->createSnapshot(opCtx.get(), majorityOpTime, SnapshotName(1)); ASSERT_EQUALS(majorityOpTime, getReplCoord()->getCurrentCommittedSnapshotOpTime()); IsMasterResponse response; @@ -3656,10 +3620,6 @@ TEST_F(ReplCoordTest, ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 2, time)); - ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(opCtx.get(), time, majorityWriteConcern).status); - - getReplCoord()->createSnapshot(opCtx.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->awaitReplication(opCtx.get(), time, majorityWriteConcern).status); } @@ -3911,21 +3871,21 @@ TEST_F(StableTimestampTest, SetMyLastAppliedSetsStableTimestampForStorage) { Timestamp stableTimestamp; // There should be no stable timestamp candidates until setMyLastAppliedOpTime is called. - repl->advanceCommitPoint(OpTime({0, 2}, 0)); + repl->advanceCommitPoint(OpTime({1, 2}, 0)); ASSERT_EQUALS(SnapshotName::min(), getStorageInterface()->getStableTimestamp()); // Check that the stable timestamp is updated when we set the applied optime. - repl->setMyLastAppliedOpTime(OpTime({0, 1}, 0)); + repl->setMyLastAppliedOpTime(OpTime({1, 1}, 0)); stableTimestamp = Timestamp(getStorageInterface()->getStableTimestamp().asU64()); - ASSERT_EQUALS(Timestamp(0, 1), stableTimestamp); + ASSERT_EQUALS(Timestamp(1, 1), stableTimestamp); // Check that timestamp cleanup occurs. - repl->setMyLastAppliedOpTime(OpTime({0, 2}, 0)); + repl->setMyLastAppliedOpTime(OpTime({1, 2}, 0)); stableTimestamp = Timestamp(getStorageInterface()->getStableTimestamp().asU64()); - ASSERT_EQUALS(Timestamp(0, 2), stableTimestamp); + ASSERT_EQUALS(Timestamp(1, 2), stableTimestamp); auto timestampCandidates = repl->getStableTimestampCandidates_forTest(); - std::set<Timestamp> expectedTimestampCandidates = {{0, 2}}; + std::set<Timestamp> expectedTimestampCandidates = {{1, 2}}; ASSERT_TIMESTAMP_SET_EQ(expectedTimestampCandidates, timestampCandidates); } @@ -3942,22 +3902,22 @@ TEST_F(StableTimestampTest, AdvanceCommitPointSetsStableTimestampForStorage) { Timestamp stableTimestamp; // Add two stable timestamp candidates. - repl->setMyLastAppliedOpTime(OpTime({0, 1}, 0)); - repl->setMyLastAppliedOpTime(OpTime({0, 2}, 0)); + repl->setMyLastAppliedOpTime(OpTime({1, 1}, 0)); + repl->setMyLastAppliedOpTime(OpTime({1, 2}, 0)); // Set a commit point and check the stable timestamp. - repl->advanceCommitPoint(OpTime({0, 1}, 0)); + repl->advanceCommitPoint(OpTime({1, 1}, 0)); stableTimestamp = Timestamp(getStorageInterface()->getStableTimestamp().asU64()); - ASSERT_EQUALS(Timestamp(0, 1), stableTimestamp); + ASSERT_EQUALS(Timestamp(1, 1), stableTimestamp); // Check that the stable timestamp is updated when we advance the commit point. - repl->advanceCommitPoint(OpTime({0, 2}, 0)); + repl->advanceCommitPoint(OpTime({1, 2}, 0)); stableTimestamp = Timestamp(getStorageInterface()->getStableTimestamp().asU64()); - ASSERT_EQUALS(Timestamp(0, 2), stableTimestamp); + ASSERT_EQUALS(Timestamp(1, 2), stableTimestamp); // Check that timestamp candidate cleanup occurs. auto timestampCandidates = getReplCoord()->getStableTimestampCandidates_forTest(); - std::set<Timestamp> expectedTimestampCandidates = {{0, 2}}; + std::set<Timestamp> expectedTimestampCandidates = {{1, 2}}; ASSERT_TIMESTAMP_SET_EQ(expectedTimestampCandidates, timestampCandidates); } @@ -4358,7 +4318,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeIsNewer OpTime time(Timestamp(10, 0), 1); OpTime oldTime(Timestamp(9, 0), 1); - getReplCoord()->createSnapshot(opCtx.get(), time, SnapshotName(1)); + getReplCoord()->setMyLastAppliedOpTime(time); // higher OpTime, should change getReplCoord()->advanceCommitPoint(time); @@ -5030,84 +4990,18 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); - getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); - getReplCoord()->createSnapshot(opCtx.get(), time5, SnapshotName(3)); + getReplCoord()->setMyLastAppliedOpTime(time1); + getReplCoord()->setMyLastAppliedOpTime(time2); + getReplCoord()->setMyLastAppliedOpTime(time5); // ensure current snapshot follows price is right rules (closest but not greater than) - getReplCoord()->setMyLastAppliedOpTime(time3); + getReplCoord()->setMyLastDurableOpTime(time3); ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); - getReplCoord()->setMyLastAppliedOpTime(time4); getReplCoord()->setMyLastDurableOpTime(time4); ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); -} - -TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLatestSnapshot) { - init("mySet"); - - assertStartSuccess(BSON("_id" - << "mySet" - << "version" - << 1 - << "members" - << BSON_ARRAY(BSON("_id" << 0 << "host" - << "test1:1234"))), - HostAndPort("test1", 1234)); - auto opCtx = makeOperationContext(); - runSingleNodeElection(opCtx.get()); - - OpTime time1(Timestamp(100, 1), 1); - OpTime time2(Timestamp(100, 2), 1); - OpTime time3(Timestamp(100, 3), 1); - OpTime time4(Timestamp(100, 4), 1); - OpTime time5(Timestamp(100, 5), 1); - OpTime time6(Timestamp(100, 6), 1); - - getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); - getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); - getReplCoord()->createSnapshot(opCtx.get(), time5, SnapshotName(3)); - - // ensure current snapshot will not advance beyond existing snapshots - getReplCoord()->setMyLastAppliedOpTime(time6); - getReplCoord()->setMyLastDurableOpTime(time6); - ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime()); -} - -TEST_F(ReplCoordTest, - AdvanceCommittedSnapshotWhenASnapshotAtNewestAsOldAsOurNewestOpTimeIsCreated) { - init("mySet"); - - assertStartSuccess(BSON("_id" - << "mySet" - << "version" - << 1 - << "members" - << BSON_ARRAY(BSON("_id" << 0 << "host" - << "test1:1234"))), - HostAndPort("test1", 1234)); - - auto opCtx = makeOperationContext(); - runSingleNodeElection(opCtx.get()); - - OpTime time1(Timestamp(100, 1), 1); - OpTime time2(Timestamp(100, 2), 1); - OpTime time3(Timestamp(100, 3), 1); - OpTime time4(Timestamp(100, 4), 1); - OpTime time5(Timestamp(100, 5), 1); - OpTime time6(Timestamp(100, 6), 1); - - getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); - getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); - getReplCoord()->createSnapshot(opCtx.get(), time5, SnapshotName(3)); - - getReplCoord()->setMyLastAppliedOpTime(time6); - getReplCoord()->setMyLastDurableOpTime(time6); + getReplCoord()->setMyLastDurableOpTime(time5); ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime()); - - // ensure current snapshot updates on new snapshot if we are that far - getReplCoord()->createSnapshot(opCtx.get(), time6, SnapshotName(4)); - ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { @@ -5132,9 +5026,10 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); - getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); - getReplCoord()->createSnapshot(opCtx.get(), time5, SnapshotName(3)); + getReplCoord()->setMyLastAppliedOpTime(time1); + getReplCoord()->setMyLastAppliedOpTime(time2); + getReplCoord()->setMyLastAppliedOpTime(time5); + getReplCoord()->setMyLastDurableOpTime(time5); // ensure dropping all snapshots should reset the current committed snapshot getReplCoord()->dropAllSnapshots(); @@ -5159,15 +5054,12 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) { OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); - getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); - getReplCoord()->setMyLastAppliedOpTime(time1); ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); getReplCoord()->setMyLastAppliedOpTime(time2); ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); - getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); - ASSERT_EQUALS(time1, getReplCoord()->getCurrentCommittedSnapshotOpTime()); + ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } TEST_F(ReplCoordTest, diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 02097d19815..68cdbe6f009 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -461,8 +461,6 @@ SnapshotName ReplicationCoordinatorMock::reserveSnapshotName(OperationContext* o return SnapshotName(_snapshotNameGenerator.addAndFetch(1)); } -void ReplicationCoordinatorMock::forceSnapshotCreation() {} - void ReplicationCoordinatorMock::createSnapshot(OperationContext* opCtx, OpTime timeOfSnapshot, SnapshotName name){}; diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 3ae99bb0dc4..4472247de44 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -249,8 +249,6 @@ public: virtual SnapshotName reserveSnapshotName(OperationContext* opCtx); - virtual void forceSnapshotCreation() override; - virtual void createSnapshot(OperationContext* opCtx, OpTime timeOfSnapshot, SnapshotName name) override; diff --git a/src/mongo/db/repl/snapshot_thread.h b/src/mongo/db/repl/snapshot_thread.h deleted file mode 100644 index 4016bf81b30..00000000000 --- a/src/mongo/db/repl/snapshot_thread.h +++ /dev/null @@ -1,85 +0,0 @@ -/* -* Copyright (C) 2015 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/base/disallow_copying.h" -#include "mongo/db/service_context.h" -#include "mongo/db/storage/snapshot_manager.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/thread.h" - -namespace mongo { -namespace repl { - -/** - * The thread that makes storage snapshots periodically to enable majority committed reads. - * - * Currently the implementation must live in oplog.cpp because it uses newOpMutex. - * TODO find a better home for this. - */ -class SnapshotThread { - MONGO_DISALLOW_COPYING(SnapshotThread); - -public: - /** - * Starts a thread to take periodic snapshots if supported by the storageEngine. - * - * If the current storage engine doesn't support snapshots, a null pointer will be returned. - */ - static std::unique_ptr<SnapshotThread> start(ServiceContext* service); - - /** - * Signals the thread to stop and waits for it to finish. - * This must be called exactly once before exitCleanly() takes the global X lock. - */ - void shutdown(); - - /** - * Forces a new snapshot to be taken even if the global timestamp hasn't changed. - * - * Does not wait for the snapshot to be taken. - */ - void forceSnapshot(); - -private: - explicit SnapshotThread(SnapshotManager* manager); - void run(); - bool shouldSleepMore(int numSleepsDone, size_t numUncommittedSnapshots); - - SnapshotManager* const _manager; - bool _hitSnapshotLimit = false; - - AtomicWord<bool> _inShutdown{false}; // writes guarded by newOpMutex in oplog.cpp. - AtomicWord<bool> _forcedSnapshotPending{false}; // writes guarded by newOpMutex in oplog.cpp. - stdx::thread _thread; -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/storage/kv/SConscript b/src/mongo/db/storage/kv/SConscript index 2d67b4b5bf3..f9074a9a9f3 100644 --- a/src/mongo/db/storage/kv/SConscript +++ b/src/mongo/db/storage/kv/SConscript @@ -87,7 +87,7 @@ env.Library( source=[ 'kv_catalog_feature_tracker_test.cpp', 'kv_engine_test_harness.cpp', - 'kv_engine_test_snapshots.cpp', + 'kv_engine_test_timestamps.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/service_context', diff --git a/src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp index d9d14c2e758..a45da7bf28b 100644 --- a/src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp +++ b/src/mongo/db/storage/kv/kv_engine_test_timestamps.cpp @@ -89,25 +89,14 @@ public: return Operation(service.makeClient(""), helper->getEngine()->newRecoveryUnit()); } - void prepareSnapshot() { - snapshotOperation = makeOperation(); // each prepare gets a new operation. - snapshotManager->prepareForCreateSnapshot(snapshotOperation).transitional_ignore(); - } - - SnapshotName createSnapshot() { - auto name = SnapshotName(++_counter); - ASSERT_OK(snapshotManager->createSnapshot(snapshotOperation, name)); + SnapshotName incrementTimestamp() { + auto name = SnapshotName(_counter); + _counter = Timestamp(_counter.getSecs() + 1, _counter.getInc()); return name; } - SnapshotName prepareAndCreateSnapshot() { - prepareSnapshot(); - return createSnapshot(); - } - RecordId insertRecord(OperationContext* opCtx, std::string contents = "abcd") { - auto id = - rs->insertRecord(opCtx, contents.c_str(), contents.length() + 1, Timestamp(), false); + auto id = rs->insertRecord(opCtx, contents.c_str(), contents.length() + 1, _counter, false); ASSERT_OK(id); return id.getValue(); } @@ -123,6 +112,7 @@ public: void updateRecordAndCommit(RecordId id, std::string contents) { auto op = makeOperation(); WriteUnitOfWork wuow(op); + ASSERT_OK(op->recoveryUnit()->setTimestamp(SnapshotName(_counter))); ASSERT_OK( rs->updateRecord(op, id, contents.c_str(), contents.length() + 1, false, nullptr)); wuow.commit(); @@ -131,6 +121,7 @@ public: void deleteRecordAndCommit(RecordId id) { auto op = makeOperation(); WriteUnitOfWork wuow(op); + ASSERT_OK(op->recoveryUnit()->setTimestamp(SnapshotName(_counter))); rs->deleteRecord(op, id); wuow.commit(); } @@ -190,7 +181,7 @@ public: Operation snapshotOperation; private: - uint64_t _counter = 0; + Timestamp _counter = Timestamp(1, 0); }; } // namespace @@ -217,7 +208,7 @@ TEST_F(SnapshotManagerTests, FailsWithNoCommittedSnapshot) { ErrorCodes::ReadConcernMajorityNotAvailableYet); // There is a snapshot but it isn't committed. - auto name = prepareAndCreateSnapshot(); + auto name = incrementTimestamp(); ASSERT_EQ(ru->setReadFromMajorityCommittedSnapshot(), ErrorCodes::ReadConcernMajorityNotAvailableYet); @@ -238,7 +229,7 @@ TEST_F(SnapshotManagerTests, FailsAfterDropAllSnapshotsWhileYielded) { auto op = makeOperation(); // Start an operation using a committed snapshot. - auto name = prepareAndCreateSnapshot(); + auto name = incrementTimestamp(); snapshotManager->setCommittedSnapshot(name, Timestamp(name.asU64())); ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); ASSERT_EQ(itCountOn(op), 0); // acquires a snapshot. @@ -259,27 +250,20 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) { // Snapshot variables are named according to the size of the RecordStore at the time of the // snapshot. - auto snap0 = prepareAndCreateSnapshot(); + auto snap0 = incrementTimestamp(); + snapshotManager->setCommittedSnapshot(snap0, Timestamp(snap0.asU64())); + ASSERT_EQ(itCountCommitted(), 0); insertRecordAndCommit(); - auto snap1 = prepareAndCreateSnapshot(); - insertRecordAndCommit(); - prepareSnapshot(); - insertRecordAndCommit(); - auto snap2 = createSnapshot(); + ASSERT_EQ(itCountCommitted(), 0); - { - auto op = makeOperation(); - WriteUnitOfWork wuow(op); - insertRecord(op); - prepareSnapshot(); // insert should still be invisible. - ASSERT_EQ(itCountOn(snapshotOperation), 3); + auto snap1 = incrementTimestamp(); - wuow.commit(); - } - auto snap3 = createSnapshot(); + insertRecordAndCommit(); + insertRecordAndCommit(); + auto snap3 = incrementTimestamp(); { auto op = makeOperation(); @@ -287,19 +271,14 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) { insertRecord(op); // rolling back wuow } - auto snap4 = prepareAndCreateSnapshot(); + + insertRecordAndCommit(); + auto snap4 = incrementTimestamp(); // If these fail, everything is busted. - snapshotManager->setCommittedSnapshot(snap0, Timestamp(snap0.asU64())); ASSERT_EQ(itCountCommitted(), 0); snapshotManager->setCommittedSnapshot(snap1, Timestamp(snap1.asU64())); ASSERT_EQ(itCountCommitted(), 1); - - // If this fails, the snapshot is from the 'create' time rather than the 'prepare' time. - snapshotManager->setCommittedSnapshot(snap2, Timestamp(snap2.asU64())); - ASSERT_EQ(itCountCommitted(), 2); - - // If this fails, the snapshot contains writes that weren't yet committed. snapshotManager->setCommittedSnapshot(snap3, Timestamp(snap3.asU64())); ASSERT_EQ(itCountCommitted(), 3); @@ -315,10 +294,6 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) { // If this fails, longOp changed snapshots at an illegal time. ASSERT_EQ(itCountOn(longOp), 3); - // If this fails, snapshots aren't preserved while in use. - snapshotManager->cleanupUnneededSnapshots(); - ASSERT_EQ(itCountOn(longOp), 3); - // If this fails, longOp didn't get a new snapshot when it should have. longOp->recoveryUnit()->abandonSnapshot(); ASSERT_EQ(itCountOn(longOp), 4); @@ -329,16 +304,16 @@ TEST_F(SnapshotManagerTests, UpdateAndDelete) { return; // This test is only for engines that DO support SnapshotMangers. // Snapshot variables are named according to the state of the record. - auto snapBeforeInsert = prepareAndCreateSnapshot(); + auto snapBeforeInsert = incrementTimestamp(); auto id = insertRecordAndCommit("Dog"); - auto snapDog = prepareAndCreateSnapshot(); + auto snapDog = incrementTimestamp(); updateRecordAndCommit(id, "Cat"); - auto snapCat = prepareAndCreateSnapshot(); + auto snapCat = incrementTimestamp(); deleteRecordAndCommit(id); - auto snapAfterDelete = prepareAndCreateSnapshot(); + auto snapAfterDelete = incrementTimestamp(); snapshotManager->setCommittedSnapshot(snapBeforeInsert, Timestamp(snapBeforeInsert.asU64())); ASSERT_EQ(itCountCommitted(), 0); diff --git a/src/mongo/db/storage/snapshot_manager.h b/src/mongo/db/storage/snapshot_manager.h index b0dfc15c013..4ca7be39a7c 100644 --- a/src/mongo/db/storage/snapshot_manager.h +++ b/src/mongo/db/storage/snapshot_manager.h @@ -61,16 +61,6 @@ public: virtual Status prepareForCreateSnapshot(OperationContext* opCtx) = 0; /** - * Creates a new named snapshot representing the same point-in-time captured in - * prepareForCreateSnapshot(). - * - * Must be called in the same ScopedTransaction as prepareForCreateSnapshot. - * - * Caller guarantees that this name must compare greater than all existing snapshots. - */ - virtual Status createSnapshot(OperationContext* opCtx, const SnapshotName& name) = 0; - - /** * Sets the snapshot to be used for committed reads. * * Implementations are allowed to assume that all older snapshots have names that compare diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 8e700112d0d..2a606f80565 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1166,8 +1166,8 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, } else { ts = timestamps[i]; } - LOG(4) << "inserting record with timestamp " << ts.asULL(); if (!ts.isNull()) { + LOG(4) << "inserting record with timestamp " << ts.asULL(); fassertStatusOK(39001, opCtx->recoveryUnit()->setTimestamp(SnapshotName(ts))); } setKey(c, record.id); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 3b0e7cdb018..2828d8d89ff 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -235,7 +235,8 @@ void WiredTigerRecoveryUnit::_txnOpen() { WT_SESSION* session = _session->getSession(); if (_readAtTimestamp != SnapshotName::min()) { - _sessionCache->snapshotManager().beginTransactionAtTimestamp(_readAtTimestamp, session); + uassertStatusOK(_sessionCache->snapshotManager().beginTransactionAtTimestamp( + _readAtTimestamp, session)); } else if (_readFromMajorityCommittedSnapshot) { _majorityCommittedSnapshot = _sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(session); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp index 5c0acc7224e..8dd7dcfd73b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp @@ -49,13 +49,6 @@ Status WiredTigerSnapshotManager::prepareForCreateSnapshot(OperationContext* opC return Status::OK(); } -Status WiredTigerSnapshotManager::createSnapshot(OperationContext* opCtx, - const SnapshotName& name) { - auto session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(); - const std::string config = str::stream() << "name=" << name.asU64(); - return wtRCToStatus(session->snapshot(session, config.c_str())); -} - void WiredTigerSnapshotManager::setCommittedSnapshot(const SnapshotName& name, Timestamp ts) { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -63,22 +56,11 @@ void WiredTigerSnapshotManager::setCommittedSnapshot(const SnapshotName& name, T _committedSnapshot = name; } -void WiredTigerSnapshotManager::cleanupUnneededSnapshots() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - - if (!_committedSnapshot) - return; - - const std::string config = str::stream() << "drop=(before=" << _committedSnapshot->asU64() - << ')'; - invariantWTOK(_session->snapshot(_session, config.c_str())); -} +void WiredTigerSnapshotManager::cleanupUnneededSnapshots() {} void WiredTigerSnapshotManager::dropAllSnapshots() { stdx::lock_guard<stdx::mutex> lock(_mutex); _committedSnapshot = boost::none; - - invariantWTOK(_session->snapshot(_session, "drop=(all)")); } void WiredTigerSnapshotManager::shutdown() { @@ -95,8 +77,8 @@ boost::optional<SnapshotName> WiredTigerSnapshotManager::getMinSnapshotForNextCo return _committedSnapshot; } -void WiredTigerSnapshotManager::beginTransactionAtTimestamp(SnapshotName pointInTime, - WT_SESSION* session) const { +Status WiredTigerSnapshotManager::beginTransactionAtTimestamp(SnapshotName pointInTime, + WT_SESSION* session) const { char readTSConfigString[15 /* read_timestamp= */ + (8 * 2) /* 8 hexadecimal characters */ + 1 /* trailing null */]; auto size = std::snprintf(readTSConfigString, @@ -104,7 +86,8 @@ void WiredTigerSnapshotManager::beginTransactionAtTimestamp(SnapshotName pointIn "read_timestamp=%llx", static_cast<unsigned long long>(pointInTime.asU64())); invariant(static_cast<std::size_t>(size) < sizeof(readTSConfigString)); - invariantWTOK(session->begin_transaction(session, readTSConfigString)); + + return wtRCToStatus(session->begin_transaction(session, readTSConfigString)); } SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot( @@ -114,17 +97,14 @@ SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot( uassert(ErrorCodes::ReadConcernMajorityNotAvailableYet, "Committed view disappeared while running operation", _committedSnapshot); - - StringBuilder config; - config << "snapshot=" << _committedSnapshot->asU64(); - invariantWTOK(session->begin_transaction(session, config.str().c_str())); - + auto status = beginTransactionAtTimestamp(_committedSnapshot.get(), session); + fassertStatusOK(30635, status); return *_committedSnapshot; } void WiredTigerSnapshotManager::beginTransactionOnOplog(WiredTigerOplogManager* oplogManager, WT_SESSION* session) const { - size_t retries = 1000; + std::size_t retries = 1000; int status; do { stdx::lock_guard<stdx::mutex> lock(_mutex); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h index 3d1187806a6..2609b396e24 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h @@ -55,7 +55,6 @@ public: } Status prepareForCreateSnapshot(OperationContext* opCtx) final; - Status createSnapshot(OperationContext* opCtx, const SnapshotName& name) final; void setCommittedSnapshot(const SnapshotName& name, Timestamp ts) final; void cleanupUnneededSnapshots() final; void dropAllSnapshots() final; @@ -69,7 +68,7 @@ public: */ void shutdown(); - void beginTransactionAtTimestamp(SnapshotName pointInTime, WT_SESSION* session) const; + Status beginTransactionAtTimestamp(SnapshotName pointInTime, WT_SESSION* session) const; /** * Starts a transaction and returns the SnapshotName used. |