diff options
author | Randolph Tan <randolph@10gen.com> | 2020-03-03 11:24:56 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-05 17:04:42 +0000 |
commit | b3e25a1f353cdd7e47e3efe0119ef3a0770c093e (patch) | |
tree | 4828360571e543f49f5eab69c9ad130037f8de94 | |
parent | bed0e7366eaeaaca150cd66058f37b643ed1c23f (diff) | |
download | mongo-b3e25a1f353cdd7e47e3efe0119ef3a0770c093e.tar.gz |
SERVER-46466 Make session migration destination check out session
(cherry picked from commit 6931d7b2d6b5f6864a3995554f9af9e30fe859e9)
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 2fcbb4af1fd..44c631457ea 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -174,9 +174,11 @@ repl::OplogEntry parseOplog(const BSONObj& oplogBSON) { /** * Gets the next batch of oplog entries from the source shard. */ -BSONObj getNextSessionOplogBatch(OperationContext* opCtx, - const ShardId& fromShard, +BSONObj getNextSessionOplogBatch(const ShardId& fromShard, const MigrationSessionId& migrationSessionId) { + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShard); uassertStatusOK(shardStatus.getStatus()); @@ -204,8 +206,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, * Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session * information. The new oplogEntry will also link to prePostImageTs if not null. */ -ProcessOplogResult processSessionOplog(OperationContext* opCtx, - const BSONObj& oplogBSON, +ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, const ProcessOplogResult& lastResult) { auto oplogEntry = parseOplog(oplogBSON); const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); @@ -247,11 +248,17 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto stmtId = *oplogEntry.getStatementId(); - auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); - scopedSession->beginTxn(opCtx, result.txnNum); + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + opCtx->setLogicalSessionId(result.sessionId); + opCtx->setTxnNumber(result.txnNum); + + OperationContextSession scopedSession(opCtx, true); + auto session = OperationContextSession::get(opCtx); + session->beginTxn(opCtx, result.txnNum); try { - if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { + if (session->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { return lastResult; } } catch (const DBException& ex) { @@ -272,7 +279,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); - oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum); + oplogLink.prevOpTime = session->getLastWriteOpTime(result.txnNum); writeConflictRetry( opCtx, @@ -312,7 +319,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the // next oplog will contain the real operation if (!result.isPrePostImage) { - scopedSession->onMigrateCompletedOnPrimary( + session->onMigrateCompletedOnPrimary( opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); } @@ -392,9 +399,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service Client::initThread( "sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr); - auto uniqueCtx = cc().makeOperationContext(); - auto opCtx = uniqueCtx.get(); - bool oplogDrainedAfterCommiting = false; ProcessOplogResult lastResult; repl::OpTime lastOpTimeWaited; @@ -407,7 +411,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } } - auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); + auto nextBatch = getNextSessionOplogBatch(_fromShard, _migrationSessionId); BSONArray oplogArray(nextBatch[kOplogField].Obj()); BSONArrayIteratorSorted oplogIter(oplogArray); const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue(); @@ -428,6 +432,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } } + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + WriteConcernResult unusedWCResult; uassertStatusOK( waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult)); @@ -456,7 +463,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service while (oplogIter.more()) { try { - lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult); + lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult); } catch (const DBException& ex) { if (ex.code() == ErrorCodes::ConflictingOperationInProgress || ex.code() == ErrorCodes::TransactionTooOld) { @@ -471,6 +478,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } WriteConcernResult unusedWCResult; + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult)); { |