summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2020-03-03 11:24:56 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-05 17:04:42 +0000
commitb3e25a1f353cdd7e47e3efe0119ef3a0770c093e (patch)
tree4828360571e543f49f5eab69c9ad130037f8de94
parentbed0e7366eaeaaca150cd66058f37b643ed1c23f (diff)
downloadmongo-b3e25a1f353cdd7e47e3efe0119ef3a0770c093e.tar.gz
SERVER-46466 Make session migration destination check out session
(cherry picked from commit 6931d7b2d6b5f6864a3995554f9af9e30fe859e9)
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp38
1 files changed, 24 insertions, 14 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 2fcbb4af1fd..44c631457ea 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -174,9 +174,11 @@ repl::OplogEntry parseOplog(const BSONObj& oplogBSON) {
/**
* Gets the next batch of oplog entries from the source shard.
*/
-BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
- const ShardId& fromShard,
+BSONObj getNextSessionOplogBatch(const ShardId& fromShard,
const MigrationSessionId& migrationSessionId) {
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShard);
uassertStatusOK(shardStatus.getStatus());
@@ -204,8 +206,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
* Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session
* information. The new oplogEntry will also link to prePostImageTs if not null.
*/
-ProcessOplogResult processSessionOplog(OperationContext* opCtx,
- const BSONObj& oplogBSON,
+ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
const ProcessOplogResult& lastResult) {
auto oplogEntry = parseOplog(oplogBSON);
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
@@ -247,11 +248,17 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto stmtId = *oplogEntry.getStatementId();
- auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
- scopedSession->beginTxn(opCtx, result.txnNum);
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+ opCtx->setLogicalSessionId(result.sessionId);
+ opCtx->setTxnNumber(result.txnNum);
+
+ OperationContextSession scopedSession(opCtx, true);
+ auto session = OperationContextSession::get(opCtx);
+ session->beginTxn(opCtx, result.txnNum);
try {
- if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
+ if (session->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
return lastResult;
}
} catch (const DBException& ex) {
@@ -272,7 +279,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
- oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum);
+ oplogLink.prevOpTime = session->getLastWriteOpTime(result.txnNum);
writeConflictRetry(
opCtx,
@@ -312,7 +319,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
// Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the
// next oplog will contain the real operation
if (!result.isPrePostImage) {
- scopedSession->onMigrateCompletedOnPrimary(
+ session->onMigrateCompletedOnPrimary(
opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime());
}
@@ -392,9 +399,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
Client::initThread(
"sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr);
- auto uniqueCtx = cc().makeOperationContext();
- auto opCtx = uniqueCtx.get();
-
bool oplogDrainedAfterCommiting = false;
ProcessOplogResult lastResult;
repl::OpTime lastOpTimeWaited;
@@ -407,7 +411,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
+ auto nextBatch = getNextSessionOplogBatch(_fromShard, _migrationSessionId);
BSONArray oplogArray(nextBatch[kOplogField].Obj());
BSONArrayIteratorSorted oplogIter(oplogArray);
const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue();
@@ -428,6 +432,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
WriteConcernResult unusedWCResult;
uassertStatusOK(
waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
@@ -456,7 +463,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
while (oplogIter.more()) {
try {
- lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult);
+ lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult);
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::ConflictingOperationInProgress ||
ex.code() == ErrorCodes::TransactionTooOld) {
@@ -471,6 +478,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
WriteConcernResult unusedWCResult;
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
{