diff options
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp | 75 |
1 files changed, 59 insertions, 16 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index ff13c2dc9a1..0fca779006f 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -256,25 +256,37 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* opCtx, - const std::string&, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - const MigrationSessionId migrationSessionId( - uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + /** + * Fetches the next batch of oplog that needs to be transferred and appends it to the given + * array builder. If it was not able to fetch anything, it will return a non-null notification + * that will get signalled when new batches comes in or when migration is over. If the boolean + * value from the notification returns true, then the migration has entered the critical + * section or aborted and there's no more new batches to fetch. + */ + std::shared_ptr<Notification<bool>> fetchNextSessionMigrationBatch( + OperationContext* opCtx, + const MigrationSessionId& migrationSessionId, + BSONArrayBuilder* arrBuilder) { + boost::optional<repl::OpTime> opTime; + std::shared_ptr<Notification<bool>> newOplogNotification; - BSONArrayBuilder arrBuilder; + writeConflictRetry( + opCtx, + "Fetching session related oplogs for migration", + NamespaceString::kRsOplogNamespace.ns(), + [&]() { + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); + opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, arrBuilder); - boost::optional<repl::OpTime> opTime; + if (arrBuilder->arrSize() == 0) { + newOplogNotification = + autoCloner.getCloner()->getNotificationForNextSessionMigrationBatch(); + } + }); - writeConflictRetry(opCtx, - "Fetching session related oplogs for migration", - NamespaceString::kRsOplogNamespace.ns(), - [&]() { - AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); - opTime = autoCloner.getCloner()->nextSessionMigrationBatch( - opCtx, &arrBuilder); - }); + if (newOplogNotification) { + return newOplogNotification; + } // If the batch returns something, we wait for write concern to ensure that all the entries // in the batch have been majority committed. We then need to check that the rollback id @@ -302,7 +314,38 @@ public: rollbackId == rollbackIdAtMigrationInit); } + return nullptr; + } + + bool run(OperationContext* opCtx, + const std::string&, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + const MigrationSessionId migrationSessionId( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + + BSONArrayBuilder arrBuilder; + bool hasMigrationCompleted = false; + + do { + if (auto newOplogNotification = + fetchNextSessionMigrationBatch(opCtx, migrationSessionId, &arrBuilder)) { + hasMigrationCompleted = newOplogNotification->get(opCtx); + } else if (arrBuilder.arrSize() == 0) { + // If we didn't get a notification and the arrBuilder is empty, that means + // that the sessionMigration is not active for this migration (most likely + // because it's not a replica set). + hasMigrationCompleted = true; + } + } while (arrBuilder.arrSize() == 0 && !hasMigrationCompleted); + result.appendArray("oplog", arrBuilder.arr()); + + // TODO: SERVER-40187 remove after v4.2. This is to indicate the caller that this server + // waits for notification on new oplog entries to send over so the caller doesn't need + // to throttle. + result.append("waitsForNewOplog", true); + return true; } |