diff options
author | Randolph Tan <randolph@10gen.com> | 2020-03-03 11:24:56 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-04 19:07:41 +0000 |
commit | 6931d7b2d6b5f6864a3995554f9af9e30fe859e9 (patch) | |
tree | c6aab8999eeca6c59bca4ba3201e0b743c3b0ce5 /src | |
parent | 60b4902efe05c84d938f6ac600f406d53de0eb6f (diff) | |
download | mongo-6931d7b2d6b5f6864a3995554f9af9e30fe859e9.tar.gz |
SERVER-46466 Make session migration destination check out session
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 6ae50ee26fb..bcea0d02b51 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -174,9 +174,11 @@ repl::OplogEntry parseOplog(const BSONObj& oplogBSON) { /** * Gets the next batch of oplog entries from the source shard. */ -BSONObj getNextSessionOplogBatch(OperationContext* opCtx, - const ShardId& fromShard, +BSONObj getNextSessionOplogBatch(const ShardId& fromShard, const MigrationSessionId& migrationSessionId) { + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShard); uassertStatusOK(shardStatus.getStatus()); @@ -204,8 +206,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, * Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session * information. The new oplogEntry will also link to prePostImageTs if not null. */ -ProcessOplogResult processSessionOplog(OperationContext* opCtx, - const BSONObj& oplogBSON, +ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, const ProcessOplogResult& lastResult) { auto oplogEntry = parseOplog(oplogBSON); const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); @@ -247,8 +248,14 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto stmtId = *oplogEntry.getStatementId(); - auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); - if (!scopedSession->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) { + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + opCtx->setLogicalSessionId(result.sessionId); + opCtx->setTxnNumber(result.txnNum); + + OperationContextSession scopedSession(opCtx, true); + auto session = OperationContextSession::get(opCtx); + if (!session->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) { // Don't continue migrating the transaction history return lastResult; } @@ -257,7 +264,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); - oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum); + oplogLink.prevOpTime = session->getLastWriteOpTime(result.txnNum); writeConflictRetry( opCtx, @@ -297,7 +304,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the // next oplog will contain the real operation if (!result.isPrePostImage) { - scopedSession->onMigrateCompletedOnPrimary( + session->onMigrateCompletedOnPrimary( opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); } @@ -377,9 +384,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service Client::initThread( "sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr); - auto uniqueCtx = cc().makeOperationContext(); - auto opCtx = uniqueCtx.get(); - bool oplogDrainedAfterCommiting = false; ProcessOplogResult lastResult; repl::OpTime lastOpTimeWaited; @@ -392,7 +396,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } } - auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); + auto nextBatch = getNextSessionOplogBatch(_fromShard, _migrationSessionId); BSONArray oplogArray(nextBatch[kOplogField].Obj()); BSONArrayIteratorSorted oplogIter(oplogArray); const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue(); @@ -413,6 +417,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } } + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + WriteConcernResult unusedWCResult; uassertStatusOK( waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult)); @@ -441,7 +448,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service while (oplogIter.more()) { try { - lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult); + lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult); } catch (const DBException& ex) { if (ex.code() == ErrorCodes::ConflictingOperationInProgress || ex.code() == ErrorCodes::TransactionTooOld) { @@ -456,6 +463,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } WriteConcernResult unusedWCResult; + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult)); { |