summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2020-03-03 11:24:56 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-04 19:07:41 +0000
commit6931d7b2d6b5f6864a3995554f9af9e30fe859e9 (patch)
treec6aab8999eeca6c59bca4ba3201e0b743c3b0ce5
parent60b4902efe05c84d938f6ac600f406d53de0eb6f (diff)
downloadmongo-6931d7b2d6b5f6864a3995554f9af9e30fe859e9.tar.gz
SERVER-46466 Make session migration destination check out session
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp36
1 files changed, 23 insertions, 13 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 6ae50ee26fb..bcea0d02b51 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -174,9 +174,11 @@ repl::OplogEntry parseOplog(const BSONObj& oplogBSON) {
/**
* Gets the next batch of oplog entries from the source shard.
*/
-BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
- const ShardId& fromShard,
+BSONObj getNextSessionOplogBatch(const ShardId& fromShard,
const MigrationSessionId& migrationSessionId) {
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShard);
uassertStatusOK(shardStatus.getStatus());
@@ -204,8 +206,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
* Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session
* information. The new oplogEntry will also link to prePostImageTs if not null.
*/
-ProcessOplogResult processSessionOplog(OperationContext* opCtx,
- const BSONObj& oplogBSON,
+ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
const ProcessOplogResult& lastResult) {
auto oplogEntry = parseOplog(oplogBSON);
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
@@ -247,8 +248,14 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto stmtId = *oplogEntry.getStatementId();
- auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
- if (!scopedSession->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) {
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+ opCtx->setLogicalSessionId(result.sessionId);
+ opCtx->setTxnNumber(result.txnNum);
+
+ OperationContextSession scopedSession(opCtx, true);
+ auto session = OperationContextSession::get(opCtx);
+ if (!session->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) {
// Don't continue migrating the transaction history
return lastResult;
}
@@ -257,7 +264,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
- oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum);
+ oplogLink.prevOpTime = session->getLastWriteOpTime(result.txnNum);
writeConflictRetry(
opCtx,
@@ -297,7 +304,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
// Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the
// next oplog will contain the real operation
if (!result.isPrePostImage) {
- scopedSession->onMigrateCompletedOnPrimary(
+ session->onMigrateCompletedOnPrimary(
opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime());
}
@@ -377,9 +384,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
Client::initThread(
"sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr);
- auto uniqueCtx = cc().makeOperationContext();
- auto opCtx = uniqueCtx.get();
-
bool oplogDrainedAfterCommiting = false;
ProcessOplogResult lastResult;
repl::OpTime lastOpTimeWaited;
@@ -392,7 +396,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
+ auto nextBatch = getNextSessionOplogBatch(_fromShard, _migrationSessionId);
BSONArray oplogArray(nextBatch[kOplogField].Obj());
BSONArrayIteratorSorted oplogIter(oplogArray);
const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue();
@@ -413,6 +417,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
WriteConcernResult unusedWCResult;
uassertStatusOK(
waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
@@ -441,7 +448,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
while (oplogIter.more()) {
try {
- lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult);
+ lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult);
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::ConflictingOperationInProgress ||
ex.code() == ErrorCodes::TransactionTooOld) {
@@ -456,6 +463,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
WriteConcernResult unusedWCResult;
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
{