summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp19
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h10
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp75
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp10
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp60
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h38
6 files changed, 188 insertions, 24 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index f9018ac6cfc..393a99c00fa 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -292,8 +292,14 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
invariant(_state == kCloning);
invariant(!opCtx->lockState()->isLocked());
+
+ if (_sessionCatalogSource) {
+ _sessionCatalogSource->onCommitCloneStarted();
+ }
+
auto responseStatus =
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
+
if (responseStatus.isOK()) {
_cleanup(opCtx);
@@ -313,6 +319,10 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) {
invariant(!opCtx->lockState()->isLocked());
+ if (_sessionCatalogSource) {
+ _sessionCatalogSource->onCloneCleanup();
+ }
+
switch (_state) {
case kDone:
break;
@@ -788,4 +798,13 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra
return boost::make_optional(opTimeToWaitIfWaitingForMajority);
}
+std::shared_ptr<Notification<bool>>
+MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch() {
+ if (!_sessionCatalogSource) {
+ return nullptr;
+ }
+
+ return _sessionCatalogSource->getNotificationForNewOplog();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 3440f7d79ea..aa8fda0530c 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -164,6 +164,16 @@ public:
boost::optional<repl::OpTime> nextSessionMigrationBatch(OperationContext* opCtx,
BSONArrayBuilder* arrBuilder);
+ /**
+ * Returns a notification that can be used to wait for new oplog that needs to be migrated.
+ * If the value in the notification returns true, it means that there are no more new batches
+ * that needs to be fetched because the migration has already entered the critical section or
+ * aborted.
+ *
+ * Returns nullptr if there is no session migration associated with this migration.
+ */
+ std::shared_ptr<Notification<bool>> getNotificationForNextSessionMigrationBatch();
+
private:
friend class LogOpForShardingHandler;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 6f243ebdd68..76367397e50 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -259,25 +259,37 @@ public:
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- bool run(OperationContext* opCtx,
- const std::string&,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- const MigrationSessionId migrationSessionId(
- uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ /**
+ * Fetches the next batch of oplog that needs to be transferred and appends it to the given
+ * array builder. If it was not able to fetch anything, it will return a non-null notification
+ * that will get signalled when new batches comes in or when migration is over. If the boolean
+ * value from the notification returns true, then the migration has entered the critical
+ * section or aborted and there's no more new batches to fetch.
+ */
+ std::shared_ptr<Notification<bool>> fetchNextSessionMigrationBatch(
+ OperationContext* opCtx,
+ const MigrationSessionId& migrationSessionId,
+ BSONArrayBuilder* arrBuilder) {
+ boost::optional<repl::OpTime> opTime;
+ std::shared_ptr<Notification<bool>> newOplogNotification;
- BSONArrayBuilder arrBuilder;
+ writeConflictRetry(
+ opCtx,
+ "Fetching session related oplogs for migration",
+ NamespaceString::kRsOplogNamespace.ns(),
+ [&]() {
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, arrBuilder);
- boost::optional<repl::OpTime> opTime;
+ if (arrBuilder->arrSize() == 0) {
+ newOplogNotification =
+ autoCloner.getCloner()->getNotificationForNextSessionMigrationBatch();
+ }
+ });
- writeConflictRetry(opCtx,
- "Fetching session related oplogs for migration",
- NamespaceString::kRsOplogNamespace.ns(),
- [&]() {
- AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
- opTime = autoCloner.getCloner()->nextSessionMigrationBatch(
- opCtx, &arrBuilder);
- });
+ if (newOplogNotification) {
+ return newOplogNotification;
+ }
// If the batch returns something, we wait for write concern to ensure that all the entries
// in the batch have been majority committed. We then need to check that the rollback id
@@ -305,7 +317,38 @@ public:
rollbackId == rollbackIdAtMigrationInit);
}
+ return nullptr;
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ BSONArrayBuilder arrBuilder;
+ bool hasMigrationCompleted = false;
+
+ do {
+ if (auto newOplogNotification =
+ fetchNextSessionMigrationBatch(opCtx, migrationSessionId, &arrBuilder)) {
+ hasMigrationCompleted = newOplogNotification->get(opCtx);
+ } else if (arrBuilder.arrSize() == 0) {
+ // If we didn't get a notification and the arrBuilder is empty, that means
+ // that the sessionMigration is not active for this migration (most likely
+ // because it's not a replica set).
+ hasMigrationCompleted = true;
+ }
+ } while (arrBuilder.arrSize() == 0 && !hasMigrationCompleted);
+
result.appendArray("oplog", arrBuilder.arr());
+
+ // TODO: SERVER-40187 remove after v4.2. This is to indicate the caller that this server
+ // waits for notification on new oplog entries to send over so the caller doesn't need
+ // to throttle.
+ result.append("waitsForNewOplog", true);
+
return true;
}
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index ea33491350d..a057b9e7aae 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -54,6 +54,7 @@ namespace mongo {
namespace {
const auto kOplogField = "oplog";
+const auto kWaitsForNewOplogField = "waitsForNewOplog";
const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Milliseconds(0));
@@ -419,6 +420,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
oplogArray = BSONArray{nextBatch[kOplogField].Obj()};
+ const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue();
if (oplogArray.isEmpty()) {
{
@@ -451,10 +453,12 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- if (lastOpTimeWaited == lastResult.oplogTime) {
+ // TODO: SERVER-40187 Completely remove after v4.2. donorWaitsForNewOplog is a
+ // 'feature flag' indicating that the donor does block until there are new oplog
+ // to return so we don't need to sleep here.
+ if (!donorWaitsForNewOplog && lastOpTimeWaited == lastResult.oplogTime) {
// We got an empty result at least twice in a row from the source shard so space
- // it
- // out a little bit so we don't hammer the shard
+ // it out a little bit so we don't hammer the shard
opCtx->sleepFor(Milliseconds(200));
}
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 8eddb20b931..3d37920e59c 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -165,7 +165,32 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
}
bool SessionCatalogMigrationSource::hasMoreOplog() {
- return _hasMoreOplogFromSessionCatalog() || _hasNewWrites();
+ if (_hasMoreOplogFromSessionCatalog()) {
+ return true;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+ return _hasNewWrites(lk);
+}
+
+void SessionCatalogMigrationSource::onCommitCloneStarted() {
+ stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+
+ _state = State::kCommitStarted;
+ if (_newOplogNotification) {
+ _newOplogNotification->set(true);
+ _newOplogNotification.reset();
+ }
+}
+
+void SessionCatalogMigrationSource::onCloneCleanup() {
+ stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+
+ _state = State::kCleanup;
+ if (_newOplogNotification) {
+ _newOplogNotification->set(true);
+ _newOplogNotification.reset();
+ }
}
SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() {
@@ -190,6 +215,31 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) {
return _fetchNextNewWriteOplog(opCtx);
}
+std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificationForNewOplog() {
+ invariant(!_hasMoreOplogFromSessionCatalog());
+
+ stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+
+ if (_newOplogNotification) {
+ return _newOplogNotification;
+ }
+
+ auto notification = std::make_shared<Notification<bool>>();
+ if (_state == State::kCleanup) {
+ notification->set(true);
+ }
+ // Even if commit has started, we still need to drain the current buffer.
+ else if (_hasNewWrites(lk)) {
+ notification->set(false);
+ } else if (_state == State::kCommitStarted) {
+ notification->set(true);
+ } else {
+ _newOplogNotification = notification;
+ }
+
+ return notification;
+}
+
bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
if (_currentOplogIterator) {
if (_currentOplogIterator->hasNext()) {
@@ -254,8 +304,7 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
return false;
}
-bool SessionCatalogMigrationSource::_hasNewWrites() {
- stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) {
return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty();
}
@@ -296,6 +345,11 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
_newWriteOpTimeList.push_back(opTime);
+
+ if (_newOplogNotification) {
+ _newOplogNotification->set(false);
+ _newOplogNotification.reset();
+ }
}
SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 04e1fc00132..b866e89e84a 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -39,6 +39,7 @@
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
@@ -100,6 +101,17 @@ public:
bool fetchNextOplog(OperationContext* opCtx);
/**
+ * Returns a notification that can be used to wait for new oplog entries to fetch. Note
+ * that this should only be called if hasMoreOplog/fetchNextOplog returned false at
+ * least once.
+ *
+ * If the notification is set to true, then that means that there is no longer a need to
+ * fetch more oplog because the data migration has entered the critical section and
+ * the buffer for oplog to fetch is empty or the data migration has aborted.
+ */
+ std::shared_ptr<Notification<bool>> getNotificationForNewOplog();
+
+ /**
* Returns the oplog document that was last fetched by the fetchNextOplog call.
* Returns an empty object if there are no oplog to fetch.
*/
@@ -117,6 +129,18 @@ public:
return _rollbackIdAtInit;
}
+ /**
+ * Inform this session migration machinery that the data migration just entered the critical
+ * section.
+ */
+ void onCommitCloneStarted();
+
+ /**
+ * Inform this session migration machinery that the data migration just terminated and
+ * entering the cleanup phase (can be aborted or committed).
+ */
+ void onCloneCleanup();
+
private:
/**
* An iterator for extracting session write oplogs that need to be cloned during migration.
@@ -148,6 +172,8 @@ private:
std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
};
+ enum class State { kActive, kCommitStarted, kCleanup };
+
///////////////////////////////////////////////////////////////////////////
// Methods for extracting the oplog entries from session information.
@@ -178,7 +204,7 @@ private:
/**
* Returns true if there are oplog generated by new writes that needs to be fetched.
*/
- bool _hasNewWrites();
+ bool _hasNewWrites(WithLock);
/**
* Attempts to fetch the next oplog entry from the new writes that was saved by saveNewWriteTS.
@@ -211,7 +237,7 @@ private:
// Used to store the last fetched oplog. This enables calling get multiple times.
boost::optional<repl::OplogEntry> _lastFetchedOplog;
- // Protects _newWriteTsList, _lastFetchedNewWriteOplog
+ // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
stdx::mutex _newOplogMutex;
// Stores oplog opTime of new writes that are coming in.
@@ -219,6 +245,14 @@ private:
// Used to store the last fetched oplog from _newWriteTsList.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog;
+
+ // Stores the current state.
+ State _state{State::kActive};
+
+ // Holds the latest request for notification of new oplog entries that needs to be fetched.
+ // Sets to true if there is no need to fetch an oplog anymore (for example, because migration
+ // aborted).
+ std::shared_ptr<Notification<bool>> _newOplogNotification;
};
} // namespace mongo