diff options
Diffstat (limited to 'src/mongo/db/repl')
12 files changed, 168 insertions, 129 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 1f7334e5481..c5d580ceefe 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -862,15 +862,14 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { _localOplogCollection = nullptr; } -SnapshotThread::SnapshotThread(SnapshotManager* manager, Callback&& onSnapshotCreate) - : _manager(manager), - _onSnapshotCreate(std::move(onSnapshotCreate)), - _thread([this] { run(); }) {} +SnapshotThread::SnapshotThread(SnapshotManager* manager) + : _manager(manager), _thread([this] { run(); }) {} void SnapshotThread::run() { Client::initThread("SnapshotThread"); auto& client = cc(); auto serviceContext = client.getServiceContext(); + auto replCoord = ReplicationCoordinator::get(serviceContext); Timestamp lastTimestamp = {}; while (true) { @@ -894,7 +893,7 @@ void SnapshotThread::run() { auto txn = client.makeOperationContext(); Lock::GlobalLock globalLock(txn->lockState(), MODE_IS, UINT_MAX); - if (!ReplicationCoordinator::get(serviceContext)->getMemberState().readable()) { + if (!replCoord->getMemberState().readable()) { // If our MemberState isn't readable, we may not be in a consistent state so don't // take snapshots. When we transition into a readable state from a non-readable // state, a snapshot is forced to ensure we don't miss the latest write. This must @@ -910,23 +909,23 @@ void SnapshotThread::run() { _manager->prepareForCreateSnapshot(txn.get()); } - Timestamp thisSnapshot = {}; + auto opTimeOfSnapshot = OpTime(); { AutoGetCollectionForRead oplog(txn.get(), rsOplogName); invariant(oplog.getCollection()); // Read the latest op from the oplog. - auto record = - oplog.getCollection()->getCursor(txn.get(), /*forward*/ false)->next(); + auto cursor = oplog.getCollection()->getCursor(txn.get(), /*forward*/ false); + auto record = cursor->next(); if (!record) continue; // oplog is completely empty. const auto op = record->data.releaseToBson(); - thisSnapshot = op["ts"].timestamp(); - invariant(!thisSnapshot.isNull()); + opTimeOfSnapshot = extractOpTime(op); + invariant(!opTimeOfSnapshot.isNull()); } - _manager->createSnapshot(txn.get(), SnapshotName(thisSnapshot)); - _onSnapshotCreate(SnapshotName(thisSnapshot)); + _manager->createSnapshot(txn.get(), SnapshotName(opTimeOfSnapshot.getTimestamp())); + replCoord->onSnapshotCreate(opTimeOfSnapshot); } catch (const WriteConflictException& wce) { log() << "skipping storage snapshot pass due to write conflict"; continue; @@ -951,13 +950,11 @@ void SnapshotThread::forceSnapshot() { newTimestampNotifier.notify_all(); } -std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service, - Callback onSnapshotCreate) { - auto manager = service->getGlobalStorageEngine()->getSnapshotManager(); - if (!manager) - return {}; - return std::unique_ptr<SnapshotThread>( - new SnapshotThread(manager, std::move(onSnapshotCreate))); +std::unique_ptr<SnapshotThread> SnapshotThread::start(ServiceContext* service) { + if (auto manager = service->getGlobalStorageEngine()->getSnapshotManager()) { + return std::unique_ptr<SnapshotThread>(new SnapshotThread(manager)); + } + return {}; } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 3105d5fe5d0..8ff385b9c6e 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -630,6 +630,11 @@ public: */ virtual Status updateTerm(long long term) = 0; + /** + * Called when a new snapshot is created. + */ + virtual void onSnapshotCreate(OpTime timeOfSnapshot) = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index c45a2d4a7fe..b444b73f0f3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -182,11 +182,16 @@ public: virtual void dropAllTempCollections(OperationContext* txn) = 0; /** - * Updates the committed snapshot to the newest possible view before or on newCommitPoint. + * Drops all snapshots and clears the "committed" snapshot. + */ + virtual void dropAllSnapshots() = 0; + + /** + * Updates the committed snapshot to the newCommitPoint, and deletes older snapshots. * - * If this changes the snapshot, returns the Timestamp for the new snapshot. + * It is illegal to call with a newCommitPoint that does not name an existing snapshot. */ - virtual boost::optional<Timestamp> updateCommittedSnapshot(OpTime newCommitPoint) = 0; + virtual void updateCommittedSnapshot(OpTime newCommitPoint) = 0; /** * Signals the SnapshotThread, if running, to take a forced snapshot even if the global diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index e1e2dd1499d..6f4a8330f34 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -102,7 +102,9 @@ void ReplicationCoordinatorExternalStateImpl::startThreads() { _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); _syncSourceFeedbackThread.reset( new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback))); - startSnapshotThread(); + if (enableReplSnapshotThread) { + _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); + } _startedThreads = true; } @@ -250,35 +252,6 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( } void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { - // This is called to force-change the global timestamp following rollback. When this happens - // we need to drop all snapshots since we may need to create out-of-order snapshots. This - // would be necessary even if we used SnapshotName(term, timestamp) and RAFT because of the - // following situation: - // - // |--------|-------------|-------------| - // | OpTime | HasSnapshot | Committed | - // |--------|-------------|-------------| - // | (0, 1) | * | * | - // | (0, 2) | * | ROLLED BACK | - // | (1, 2) | | * | - // |--------|-------------|-------------| - // - // When we try to make (1,2) the commit point, we'd find (0,2) as the newest snapshot - // before the commit point, but it would be invalid to mark it as the committed snapshot - // since it was never committed. - // - // TODO SERVER-19209 We also need to clear snapshots after resync. - - { - stdx::lock_guard<stdx::mutex> lock(_snapshotsMutex); - _snapshots.clear(); - } - - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - if (auto snapshotManager = storageEngine->getSnapshotManager()) { - snapshotManager->dropAllSnapshots(); - } - setNewTimestamp(newTime); } @@ -362,51 +335,15 @@ void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationCo } } -void ReplicationCoordinatorExternalStateImpl::startSnapshotThread() { - if (!enableReplSnapshotThread) - return; - - auto onSnapshotCreate = [this](SnapshotName name) { - stdx::lock_guard<stdx::mutex> lock(_snapshotsMutex); - if (!_snapshots.empty()) { - if (name == _snapshots.back()) { - // This is already in the set. Don't want to double add. - return; - } - invariant(name > _snapshots.back()); - } - _snapshots.push_back(name); - }; - - _snapshotThread = SnapshotThread::start(getGlobalServiceContext(), std::move(onSnapshotCreate)); +void ReplicationCoordinatorExternalStateImpl::dropAllSnapshots() { + if (auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager()) + manager->dropAllSnapshots(); } -boost::optional<Timestamp> ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot( - OpTime newCommitPoint) { - if (newCommitPoint.isNull()) - return {}; - - stdx::lock_guard<stdx::mutex> lock(_snapshotsMutex); - - if (_snapshots.empty()) - return {}; - - // Seek to the first entry > the commit point and go back one to land <=. - auto it = std::upper_bound( - _snapshots.begin(), _snapshots.end(), SnapshotName(newCommitPoint.getTimestamp())); - if (it == _snapshots.begin()) - return {}; // Nothing available is <=. - --it; - auto newSnapshot = *it; - +void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(OpTime newCommitPoint) { auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); - invariant(manager); // If there is no manager, _snapshots would be empty. - manager->setCommittedSnapshot(newSnapshot); - - // Forget about all snapshots <= the new commit point. - _snapshots.erase(_snapshots.begin(), ++it); - - return {newSnapshot.timestamp()}; + invariant(manager); // This should never be called if there is no SnapshotManager. + manager->setCommittedSnapshot(SnapshotName(newCommitPoint.getTimestamp())); } void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 62c9ef0cfbd..5eae32b70a9 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -69,14 +69,13 @@ public: virtual void signalApplierToChooseNewSyncSource(); virtual OperationContext* createOperationContext(const std::string& threadName); virtual void dropAllTempCollections(OperationContext* txn); - boost::optional<Timestamp> updateCommittedSnapshot(OpTime newCommitPoint) final; + void dropAllSnapshots() final; + void updateCommittedSnapshot(OpTime newCommitPoint) final; void forceSnapshotCreation() final; std::string getNextOpContextThreadName(); private: - void startSnapshotThread(); - // Guards starting threads and setting _startedThreads stdx::mutex _threadMutex; @@ -103,9 +102,6 @@ private: long long _nextThreadId; std::unique_ptr<SnapshotThread> _snapshotThread; - - stdx::mutex _snapshotsMutex; // guards _snapshots. - std::deque<SnapshotName> _snapshots; // kept in sorted order. }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 513c8d01a53..709101e6758 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -184,10 +184,9 @@ OperationContext* ReplicationCoordinatorExternalStateMock::createOperationContex void ReplicationCoordinatorExternalStateMock::dropAllTempCollections(OperationContext* txn) {} -boost::optional<Timestamp> ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot( - OpTime newCommitPoint) { - return {}; -} +void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {} + +void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(OpTime newCommitPoint) {} void ReplicationCoordinatorExternalStateMock::forceSnapshotCreation() {} diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 1ae18cafab3..5b8f037481a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -72,7 +72,8 @@ public: virtual void signalApplierToChooseNewSyncSource(); virtual OperationContext* createOperationContext(const std::string& threadName); virtual void dropAllTempCollections(OperationContext* txn); - virtual boost::optional<Timestamp> updateCommittedSnapshot(OpTime newCommitPoint); + virtual void dropAllSnapshots(); + virtual void updateCommittedSnapshot(OpTime newCommitPoint); virtual void forceSnapshotCreation(); /** diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 9db82dec87f..b40be3f7515 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2060,6 +2060,34 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { _externalState->forceSnapshotCreation(); } + if (newState.rollback()) { + // When we start rollback, we need to drop all snapshots since we may need to create + // out-of-order snapshots. This would be necessary even if the SnapshotName was completely + // monotonically increasing because we don't necessarily have a snapshot of every write. + // If we didn't drop all snapshots on rollback it could lead to the following situation: + // + // |--------|-------------|-------------| + // | OpTime | HasSnapshot | Committed | + // |--------|-------------|-------------| + // | (0, 1) | * | * | + // | (0, 2) | * | ROLLED BACK | + // | (1, 2) | | * | + // |--------|-------------|-------------| + // + // When we try to make (1,2) the commit point, we'd find (0,2) as the newest snapshot + // before the commit point, but it would be invalid to mark it as the committed snapshot + // since it was never committed. + // + // TODO SERVER-19209 We also need to clear snapshots before a resync. + _dropAllSnapshots_inlock(); + } + + if (_memberState.rollback()) { + // Ensure that no snapshots were created while we were in rollback. + invariant(!_currentCommittedSnapshot); + invariant(_uncommittedSnapshots.empty()); + } + _memberState = newState; log() << "transition to " << newState.toString() << rsLog; return result; @@ -2490,25 +2518,32 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); // Use the index of the minimum quorum in the vector of nodes. - auto newCommittedOpTime = votingNodesOpTimes[(votingNodesOpTimes.size() - 1) / 2]; - if (newCommittedOpTime != _lastCommittedOpTime) { - _lastCommittedOpTime = newCommittedOpTime; - // TODO SERVER-19208 Also need to updateCommittedSnapshot on secondaries. - if (auto newSnapshotTs = _externalState->updateCommittedSnapshot(newCommittedOpTime)) { - // TODO use this Timestamp for the following things: - // * SERVER-19206 make w:majority writes block until they are in the committed snapshot. - // * SERVER-19211 make readCommitted + afterOptime block until the optime is in the - // committed view. - // * SERVER-19212 make new indexes not be used for any queries until the index is in the - // committed view. - } - } + _setLastCommittedOpTime_inlock(votingNodesOpTimes[(votingNodesOpTimes.size() - 1) / 2]); } void ReplicationCoordinatorImpl::_setLastCommittedOpTime(const OpTime& committedOpTime) { stdx::unique_lock<stdx::mutex> lk(_mutex); - if (committedOpTime > _lastCommittedOpTime) { - _lastCommittedOpTime = committedOpTime; + _setLastCommittedOpTime_inlock(committedOpTime); +} + +void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& committedOpTime) { + if (committedOpTime <= _lastCommittedOpTime) + return; // This may have come from an out-of-order heartbeat. Ignore it. + + _lastCommittedOpTime = committedOpTime; + + if (!_uncommittedSnapshots.empty() && _uncommittedSnapshots.front() <= committedOpTime) { + // At least one uncommitted snapshot is ready to be blessed as committed. + + // Seek to the first entry > the commit point. Previous element must be <=. + const auto onePastCommitPoint = std::upper_bound( + _uncommittedSnapshots.begin(), _uncommittedSnapshots.end(), committedOpTime); + const auto newSnapshot = *std::prev(onePastCommitPoint); + + // Forget about all snapshots <= the new commit point. + _uncommittedSnapshots.erase(_uncommittedSnapshots.begin(), onePastCommitPoint); + + _updateCommittedSnapshot_inlock(newSnapshot); } } @@ -2807,5 +2842,52 @@ bool ReplicationCoordinatorImpl::_updateTerm_incallback(long long term, Handle* return updated; } +void ReplicationCoordinatorImpl::onSnapshotCreate(OpTime timeOfSnapshot) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_memberState.readable()); // Snapshots can only be taken in a readable state. + + if (timeOfSnapshot <= _lastCommittedOpTime) { + // This snapshot is ready to be marked as committed. + invariant(_uncommittedSnapshots.empty()); + _updateCommittedSnapshot_inlock(timeOfSnapshot); + return; + } + + if (!_uncommittedSnapshots.empty()) { + if (timeOfSnapshot == _uncommittedSnapshots.back()) { + // This is already in the set. Don't want to double add. + return; + } + invariant(timeOfSnapshot > _uncommittedSnapshots.back()); + } + _uncommittedSnapshots.push_back(timeOfSnapshot); +} + +void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(OpTime newCommittedSnapshot) { + invariant(!newCommittedSnapshot.isNull()); + invariant(newCommittedSnapshot <= _lastCommittedOpTime); + if (_currentCommittedSnapshot) + invariant(newCommittedSnapshot > *_currentCommittedSnapshot); + if (!_uncommittedSnapshots.empty()) + invariant(newCommittedSnapshot < _uncommittedSnapshots.front()); + + _currentCommittedSnapshot = newCommittedSnapshot; + + _externalState->updateCommittedSnapshot(newCommittedSnapshot); + + // TODO use _currentCommittedSnapshot for the following things: + // * SERVER-19206 make w:majority writes block until they are in the committed snapshot. + // * SERVER-19211 make readCommitted + afterOptime block until the optime is in the + // committed view. + // * SERVER-19212 make new indexes not be used for any queries until the index is in the + // committed view. +} + +void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() { + _uncommittedSnapshots.clear(); + _currentCommittedSnapshot = {}; + _externalState->dropAllSnapshots(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index e6b6b64611a..0f0cccd057e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -267,6 +267,8 @@ public: virtual Status updateTerm(long long term) override; + virtual void onSnapshotCreate(OpTime timeOfSnapshot) override; + // ================== Test support API =================== /** @@ -410,6 +412,7 @@ private: * current last committed OpTime. */ void _setLastCommittedOpTime(const OpTime& committedOpTime); + void _setLastCommittedOpTime_inlock(const OpTime& committedOpTime); /** * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication. @@ -899,6 +902,16 @@ private: const ReplicationMetadata& replMetadata); void _processReplicationMetadata_incallback(const ReplicationMetadata& replMetadata); + /** + * Blesses a snapshot to be used for new committed reads. + */ + void _updateCommittedSnapshot_inlock(OpTime newCommittedSnapshot); + + /** + * Drops all snapshots and clears the "committed" snapshot. + */ + void _dropAllSnapshots_inlock(); + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -1039,6 +1052,13 @@ private: // Data Replicator used to replicate data DataReplicator _dr; // (S) + + // The OpTimes for all snapshots newer than the current commit point, kept in sorted order. + std::deque<OpTime> _uncommittedSnapshots; // (M) + + // The non-null OpTime of the current snapshot used for committed reads, if there is one. When + // engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front(). + boost::optional<OpTime> _currentCommittedSnapshot; // (M) }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 47bc1e314be..51cff896269 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -351,5 +351,7 @@ Status ReplicationCoordinatorMock::updateTerm(long long term) { return Status::OK(); } +void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot) {} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 4169a8ee5f0..65c4a22a81a 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -202,6 +202,8 @@ public: virtual Status updateTerm(long long term); + virtual void onSnapshotCreate(OpTime timeOfSnapshot); + private: const ReplSettings _settings; MemberState _memberState; diff --git a/src/mongo/db/repl/snapshot_thread.h b/src/mongo/db/repl/snapshot_thread.h index c657ee0d554..81bf2212f52 100644 --- a/src/mongo/db/repl/snapshot_thread.h +++ b/src/mongo/db/repl/snapshot_thread.h @@ -47,18 +47,12 @@ class SnapshotThread { MONGO_DISALLOW_COPYING(SnapshotThread); public: - using Callback = stdx::function<void(SnapshotName)>; - /** * Starts a thread to take periodic snapshots if supported by the storageEngine. * * If the current storage engine doesn't support snapshots, a null pointer will be returned. - * - * The passed-in callback will be called every time a snapshot is created. It will be called - * with the global lock held in MODE_IS continuously since creating the snapshot. */ - static std::unique_ptr<SnapshotThread> start(ServiceContext* service, - Callback onSnapshotCreate); + static std::unique_ptr<SnapshotThread> start(ServiceContext* service); /** * Signals the thread to stop and waits for it to finish. @@ -74,13 +68,12 @@ public: void forceSnapshot(); private: - SnapshotThread(SnapshotManager* manager, Callback&& onSnapshotCreate); + explicit SnapshotThread(SnapshotManager* manager); void run(); SnapshotManager* const _manager; bool _inShutdown = false; // guarded by newOpMutex in oplog.cpp. bool _forcedSnapshotPending = false; // guarded by newOpMutex in oplog.cpp. - const Callback _onSnapshotCreate; stdx::thread _thread; }; |