summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp75
1 files changed, 59 insertions, 16 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index ff13c2dc9a1..0fca779006f 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -256,25 +256,37 @@ public:
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- bool run(OperationContext* opCtx,
- const std::string&,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- const MigrationSessionId migrationSessionId(
- uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ /**
+ * Fetches the next batch of oplog that needs to be transferred and appends it to the given
+ * array builder. If it was not able to fetch anything, it will return a non-null notification
+ * that will get signalled when new batches comes in or when migration is over. If the boolean
+ * value from the notification returns true, then the migration has entered the critical
+ * section or aborted and there's no more new batches to fetch.
+ */
+ std::shared_ptr<Notification<bool>> fetchNextSessionMigrationBatch(
+ OperationContext* opCtx,
+ const MigrationSessionId& migrationSessionId,
+ BSONArrayBuilder* arrBuilder) {
+ boost::optional<repl::OpTime> opTime;
+ std::shared_ptr<Notification<bool>> newOplogNotification;
- BSONArrayBuilder arrBuilder;
+ writeConflictRetry(
+ opCtx,
+ "Fetching session related oplogs for migration",
+ NamespaceString::kRsOplogNamespace.ns(),
+ [&]() {
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, arrBuilder);
- boost::optional<repl::OpTime> opTime;
+ if (arrBuilder->arrSize() == 0) {
+ newOplogNotification =
+ autoCloner.getCloner()->getNotificationForNextSessionMigrationBatch();
+ }
+ });
- writeConflictRetry(opCtx,
- "Fetching session related oplogs for migration",
- NamespaceString::kRsOplogNamespace.ns(),
- [&]() {
- AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
- opTime = autoCloner.getCloner()->nextSessionMigrationBatch(
- opCtx, &arrBuilder);
- });
+ if (newOplogNotification) {
+ return newOplogNotification;
+ }
// If the batch returns something, we wait for write concern to ensure that all the entries
// in the batch have been majority committed. We then need to check that the rollback id
@@ -302,7 +314,38 @@ public:
rollbackId == rollbackIdAtMigrationInit);
}
+ return nullptr;
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ BSONArrayBuilder arrBuilder;
+ bool hasMigrationCompleted = false;
+
+ do {
+ if (auto newOplogNotification =
+ fetchNextSessionMigrationBatch(opCtx, migrationSessionId, &arrBuilder)) {
+ hasMigrationCompleted = newOplogNotification->get(opCtx);
+ } else if (arrBuilder.arrSize() == 0) {
+ // If we didn't get a notification and the arrBuilder is empty, that means
+ // that the sessionMigration is not active for this migration (most likely
+ // because it's not a replica set).
+ hasMigrationCompleted = true;
+ }
+ } while (arrBuilder.arrSize() == 0 && !hasMigrationCompleted);
+
result.appendArray("oplog", arrBuilder.arr());
+
+ // TODO: SERVER-40187 remove after v4.2. This is to indicate the caller that this server
+ // waits for notification on new oplog entries to send over so the caller doesn't need
+ // to throttle.
+ result.append("waitsForNewOplog", true);
+
return true;
}