diff options
Diffstat (limited to 'src/mongo')
4 files changed, 61 insertions, 20 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index f11496f4d1b..7703cd7b347 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -41,6 +41,7 @@ #include "mongo/db/index/index_descriptor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/service_context.h" #include "mongo/executor/remote_command_request.h" @@ -666,13 +667,12 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } -repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( +boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( OperationContext* opCtx, BSONArrayBuilder* arrBuilder) { repl::OpTime opTimeToWait; - auto seenOpTimeTerm = repl::OpTime::kUninitializedTerm; if (!_sessionCatalogSource) { - return {}; + return boost::none; } while (_sessionCatalogSource->hasMoreOplog()) { @@ -685,15 +685,6 @@ repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( } auto newOpTime = result.oplog->getOpTime(); - if (seenOpTimeTerm == repl::OpTime::kUninitializedTerm) { - seenOpTimeTerm = newOpTime.getTerm(); - } else { - uassert(40650, - str::stream() << "detected change of term from " << seenOpTimeTerm << " to " - << newOpTime.getTerm(), - seenOpTimeTerm == newOpTime.getTerm()); - } - auto oplogDoc = result.oplog->toBSON(); // Use the builder size instead of accumulating the document sizes directly so that we @@ -713,7 +704,7 @@ repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( } } - return opTimeToWait; + return boost::make_optional(opTimeToWait); } } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 46791532121..722f81c5e96 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -98,6 +98,18 @@ public: } /** + * Returns the rollback ID recorded at the beginning of session migration. If the underlying + * SessionCatalogMigrationSource does not exist, that means this node is running as a standalone + * and doesn't support retryable writes, so we return boost::none. + */ + boost::optional<int> getRollbackIdAtInit() const { + if (_sessionCatalogSource) { + return _sessionCatalogSource->getRollbackIdAtInit(); + } + return boost::none; + } + + /** * Called by the recipient shard. Used to estimate how many more bytes of clone data are * remaining in the chunk cloner. */ @@ -134,9 +146,18 @@ public: * Appends to the buffer oplogs that contain session information for this migration. * If this function returns a valid OpTime, this means that the oplog appended are * not guaranteed to be majority committed and the caller has to use wait for the - * returned opTime to be majority committed. + * returned opTime to be majority committed. If the underlying SessionCatalogMigrationSource + * does not exist, that means this node is running as a standalone and doesn't support retryable + * writes, so we return boost::none. + * + * This waiting is necessary because session migration is only allowed to send out committed + * entries, as opposed to chunk migration, which can send out uncommitted documents. With chunk + * migration, the uncommitted documents will not be visibile until the end of the migration + * commits, which means that if it fails, they won't be visible, whereas session oplog entries + * take effect immediately since they are appended to the chain. */ - repl::OpTime nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); + boost::optional<repl::OpTime> nextSessionMigrationBatch(OperationContext* opCtx, + BSONArrayBuilder* arrBuilder); private: friend class DeleteNotificationStage; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index e56347589a2..fdd3675dfe3 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -38,6 +38,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" @@ -260,7 +261,7 @@ public: BSONArrayBuilder arrBuilder; - repl::OpTime opTime; + boost::optional<repl::OpTime> opTime; writeConflictRetry(opCtx, "Fetching session related oplogs for migration", @@ -271,10 +272,31 @@ public: opCtx, &arrBuilder); }); - WriteConcernResult wcResult; - WriteConcernOptions majorityWC( - WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); - uassertStatusOK(waitForWriteConcern(opCtx, opTime, majorityWC, &wcResult)); + // If the batch returns something, we wait for write concern to ensure that all the entries + // in the batch have been majority committed. We then need to check that the rollback id + // hasn't changed since we started migration, because a change would indicate that some data + // in this batch may have been rolled back. In this case, we abort the migration. + if (opTime) { + WriteConcernResult wcResult; + WriteConcernOptions majorityWC( + WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); + uassertStatusOK(waitForWriteConcern(opCtx, opTime.get(), majorityWC, &wcResult)); + + auto rollbackIdAtMigrationInit = [&]() { + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); + return autoCloner.getCloner()->getRollbackIdAtInit(); + }(); + + // The check for rollback id must be done after having waited for majority in order to + // ensure that whatever was waited on didn't get rolled back. + auto rollbackId = repl::ReplicationProcess::get(opCtx)->getRollbackID(); + uassert(50881, + str::stream() << "rollback detected, rollbackId was " + << rollbackIdAtMigrationInit + << " but is now " + << rollbackId, + rollbackId == rollbackIdAtMigrationInit); + } result.appendArray("oplog", arrBuilder.arr()); return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 0ef285e5818..2c53ce2db51 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -109,6 +109,13 @@ public: */ void notifyNewWriteOpTime(repl::OpTime opTimestamp); + /** + * Returns the rollback ID recorded at the beginning of session migration. + */ + int getRollbackIdAtInit() const { + return _rollbackIdAtInit; + } + private: /** * An iterator for extracting session write oplogs that need to be cloned during migration. |