diff options
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source.cpp')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 60 |
1 files changed, 57 insertions, 3 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index d41d7481588..7d451b0b81e 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -163,7 +163,32 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o } bool SessionCatalogMigrationSource::hasMoreOplog() { - return _hasMoreOplogFromSessionCatalog() || _hasNewWrites(); + if (_hasMoreOplogFromSessionCatalog()) { + return true; + } + + stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + return _hasNewWrites(lk); +} + +void SessionCatalogMigrationSource::onCommitCloneStarted() { + stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + + _state = State::kCommitStarted; + if (_newOplogNotification) { + _newOplogNotification->set(true); + _newOplogNotification.reset(); + } +} + +void SessionCatalogMigrationSource::onCloneCleanup() { + stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + + _state = State::kCleanup; + if (_newOplogNotification) { + _newOplogNotification->set(true); + _newOplogNotification.reset(); + } } SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() { @@ -188,6 +213,31 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) { return _fetchNextNewWriteOplog(opCtx); } +std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificationForNewOplog() { + invariant(!_hasMoreOplogFromSessionCatalog()); + + stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + + if (_newOplogNotification) { + return _newOplogNotification; + } + + auto notification = std::make_shared<Notification<bool>>(); + if (_state == State::kCleanup) { + notification->set(true); + } + // Even if commit has started, we still need to drain the current buffer. + else if (_hasNewWrites(lk)) { + notification->set(false); + } else if (_state == State::kCommitStarted) { + notification->set(true); + } else { + _newOplogNotification = notification; + } + + return notification; +} + bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) { if (_currentOplogIterator) { if (_currentOplogIterator->hasNext()) { @@ -252,8 +302,7 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC return false; } -bool SessionCatalogMigrationSource::_hasNewWrites() { - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); +bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) { return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty(); } @@ -294,6 +343,11 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); _newWriteOpTimeList.push_back(opTime); + + if (_newOplogNotification) { + _newOplogNotification->set(false); + _newOplogNotification.reset(); + } } SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator( |