summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/session_catalog_migration_source.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source.cpp')
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp60
1 files changed, 57 insertions, 3 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index d41d7481588..7d451b0b81e 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -163,7 +163,32 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
}
bool SessionCatalogMigrationSource::hasMoreOplog() {
- return _hasMoreOplogFromSessionCatalog() || _hasNewWrites();
+ if (_hasMoreOplogFromSessionCatalog()) {
+ return true;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+ return _hasNewWrites(lk);
+}
+
+void SessionCatalogMigrationSource::onCommitCloneStarted() {
+ stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+
+ _state = State::kCommitStarted;
+ if (_newOplogNotification) {
+ _newOplogNotification->set(true);
+ _newOplogNotification.reset();
+ }
+}
+
+void SessionCatalogMigrationSource::onCloneCleanup() {
+ stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+
+ _state = State::kCleanup;
+ if (_newOplogNotification) {
+ _newOplogNotification->set(true);
+ _newOplogNotification.reset();
+ }
}
SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() {
@@ -188,6 +213,31 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) {
return _fetchNextNewWriteOplog(opCtx);
}
+std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificationForNewOplog() {
+ invariant(!_hasMoreOplogFromSessionCatalog());
+
+ stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+
+ if (_newOplogNotification) {
+ return _newOplogNotification;
+ }
+
+ auto notification = std::make_shared<Notification<bool>>();
+ if (_state == State::kCleanup) {
+ notification->set(true);
+ }
+ // Even if commit has started, we still need to drain the current buffer.
+ else if (_hasNewWrites(lk)) {
+ notification->set(false);
+ } else if (_state == State::kCommitStarted) {
+ notification->set(true);
+ } else {
+ _newOplogNotification = notification;
+ }
+
+ return notification;
+}
+
bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
if (_currentOplogIterator) {
if (_currentOplogIterator->hasNext()) {
@@ -252,8 +302,7 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
return false;
}
-bool SessionCatalogMigrationSource::_hasNewWrites() {
- stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) {
return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty();
}
@@ -294,6 +343,11 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
_newWriteOpTimeList.push_back(opTime);
+
+ if (_newOplogNotification) {
+ _newOplogNotification->set(false);
+ _newOplogNotification.reset();
+ }
}
SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(