diff options
-rw-r--r-- | src/mongo/db/logical_session_id.idl | 2 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 437 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.h | 116 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 1259 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 16 |
8 files changed, 1849 insertions, 5 deletions
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index 96310498441..e7bac0f5165 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -40,7 +40,7 @@ types: description: "A number representing an operation within a transaction." bson_serialization_type: int cpp_type: "std::int32_t" - deserializer: "mongo::BSONElement::_numberLong" + deserializer: "mongo::BSONElement::_numberInt" structs: diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index c4726a59b17..9f8c9844ef3 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -75,8 +75,9 @@ env.Library( 'move_timing_helper.cpp', 'namespace_metadata_change_notifications.cpp', 'operation_sharding_state.cpp', - 'session_catalog_migration_source.cpp', 'read_only_catalog_cache_loader.cpp', + 'session_catalog_migration_destination.cpp', + 'session_catalog_migration_source.cpp', 'shard_identity_rollback_notifier.cpp', 'shard_metadata_util.cpp', 'shard_server_catalog_cache_loader.cpp', @@ -343,3 +344,14 @@ env.CppUnitTest( ] ) +env.CppUnitTest( + target='session_catalog_migration_destination_test', + source=[ + 'session_catalog_migration_destination_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', + '$BUILD_DIR/mongo/s/shard_server_test_fixture', + 'sharding', + ] +) diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp index 4f617ac5df4..a4e947ae729 100644 --- a/src/mongo/db/s/migration_session_id.cpp +++ b/src/mongo/db/s/migration_session_id.cpp @@ -66,6 +66,10 @@ StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj return status; } +MigrationSessionId MigrationSessionId::parseFromBSON(const BSONObj& obj) { + return uassertStatusOK(extractFromBSON(obj)); +} + MigrationSessionId::MigrationSessionId(std::string sessionId) { invariant(!sessionId.empty()); _sessionId = std::move(sessionId); diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h index 0437fb5246a..e5fbe3eea08 100644 --- a/src/mongo/db/s/migration_session_id.h +++ b/src/mongo/db/s/migration_session_id.h @@ -61,6 +61,12 @@ public: static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj); /** + * Same as extractFromBSON, but throws on error. + * Function signature is compatible for idl. + */ + static MigrationSessionId parseFromBSON(const BSONObj& obj); + + /** * Compares two session identifiers. Two idendifiers match only if they are the same. */ bool matches(const MigrationSessionId& other) const; diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp new file mode 100644 index 00000000000..13ad9143457 --- /dev/null +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -0,0 +1,437 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/session_catalog_migration_destination.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/connection_string.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/s/migration_session_id.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/write_concern.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_id.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +const auto kOplogField = "oplog"; +const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + Milliseconds(0)); + +struct ProcessOplogResult { + bool isPrePostImage = false; + repl::OpTime oplogTime; + LogicalSessionId sessionId; + TxnNumber txnNum; +}; + +/** + * Returns the command request to extract session information from the source shard. + */ +BSONObj buildMigrateSessionCmd(const MigrationSessionId& migrationSessionId) { + BSONObjBuilder builder; + builder.append("_getNextSessionMods", 1); + migrationSessionId.append(&builder); + return builder.obj(); +} + +/** + * Determines whether the oplog entry has a link to either preImage/postImage and return + * a new oplogLink that contains the same link, but pointing to lastResult.oplogTime. For example, + * if entry has link to preImageTs, this returns an oplogLink with preImageTs pointing to + * lastResult.oplogTime. + * + * It is an error to have both preImage and postImage as well as not having them at all. + */ +repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult, + const repl::OplogEntry& entry) { + repl::OplogLink oplogLink; + + if (!lastResult.isPrePostImage) { + uassert(40628, + str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() + << " to not have " + << repl::OplogEntryBase::kPreImageTsFieldName + << " or " + << repl::OplogEntryBase::kPostImageTsFieldName, + !entry.getPreImageTs() && !entry.getPostImageTs()); + + return oplogLink; + } + + const auto ts = lastResult.oplogTime.getTimestamp(); + invariant(!ts.isNull()); + + const auto& sessionInfo = entry.getOperationSessionInfo(); + const auto sessionId = *sessionInfo.getSessionId(); + const auto txnNum = *sessionInfo.getTxnNumber(); + + uassert(40629, + str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": " + << redact(entry.toBSON()) + << " to have session: " + << lastResult.sessionId, + lastResult.sessionId == sessionId); + uassert(40630, + str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": " + << redact(entry.toBSON()) + << " to have txnNumber: " + << lastResult.txnNum, + lastResult.txnNum == txnNum); + + if (entry.getPreImageTs()) { + oplogLink.preImageTs = ts; + } else if (entry.getPostImageTs()) { + oplogLink.postImageTs = ts; + } else { + uasserted(40631, + str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() + << ": " + << redact(entry.toBSON()) + << " to have either " + << repl::OplogEntryBase::kPreImageTsFieldName + << " or " + << repl::OplogEntryBase::kPostImageTsFieldName); + } + + return oplogLink; +} + +/** + * Parses the oplog into an oplog entry and makes sure that it contains the expected fields. + */ +repl::OplogEntry parseOplog(const BSONObj& oplogBSON) { + auto oplogStatus = repl::OplogEntry::parse(oplogBSON); + uassertStatusOK(oplogStatus.getStatus()); + + auto oplogEntry = oplogStatus.getValue(); + + auto sessionInfo = oplogEntry.getOperationSessionInfo(); + + uassert(ErrorCodes::UnsupportedFormat, + str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString() + << " does not have sessionId: " + << redact(oplogBSON), + sessionInfo.getSessionId()); + + uassert(ErrorCodes::UnsupportedFormat, + str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString() + << " does not have txnNumber: " + << redact(oplogBSON), + sessionInfo.getTxnNumber()); + + uassert(ErrorCodes::UnsupportedFormat, + str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString() + << " does not have stmtId: " + << redact(oplogBSON), + oplogEntry.getStatementId()); + + uassert(ErrorCodes::UnsupportedFormat, + str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString() + << " does not have o2: " + << redact(oplogBSON), + oplogEntry.getObject2()); + + return oplogEntry; +} + +/** + * Gets the next batch of oplog entries from the source shard. + */ +BSONObj getNextSessionOplogBatch(OperationContext* opCtx, + const ShardId& fromShard, + const MigrationSessionId& migrationSessionId) { + auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShard); + uassertStatusOK(shardStatus.getStatus()); + + auto shard = shardStatus.getValue(); + auto responseStatus = shard->runCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + buildMigrateSessionCmd(migrationSessionId), + Shard::RetryPolicy::kNoRetry); + + uassertStatusOK(responseStatus.getStatus()); + auto result = responseStatus.getValue().response; + + auto oplogElement = result[kOplogField]; + uassert(ErrorCodes::FailedToParse, + "_migrateSession response does not have the 'oplog' field as array", + oplogElement.type() == Array); + + return result; +} + +/** + * Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session + * information. The new oplogEntry will also link to prePostImageTs if not null. + */ +ProcessOplogResult processSessionOplog(OperationContext* opCtx, + const BSONObj& oplogBSON, + // const Timestamp& prePostImageTs, + const ProcessOplogResult& lastResult) { + ProcessOplogResult result; + auto oplogEntry = parseOplog(oplogBSON); + + BSONObj object2; + if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) { + // Note: Oplog is already no-op type, no need to nest. + // There are two types of type 'n' oplog format expected here: + // (1) Oplog entries that has been transformed by a previous migration into a + // nested oplog. In this case, o field contains {$sessionMigrateInfo: 1} + // and o2 field contains the details of the original oplog. + // (2) Oplog entries that contains the pre/post-image information of a + // findAndModify operation. In this case, o field contains the relevant info + // and o2 will be empty. + + object2 = oplogEntry.getObject2().value(); + + if (object2.isEmpty()) { + result.isPrePostImage = true; + + uassert(40632, + str::stream() << "Can't handle 2 pre/post image oplog in a row. Prevoius oplog " + << lastResult.oplogTime.getTimestamp().toString() + << ", oplog ts: " + << oplogEntry.getTimestamp().toString() + << ": " + << redact(oplogBSON), + !lastResult.isPrePostImage); + } + } else { + object2 = oplogBSON; // TODO: strip redundant info? + } + + const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); + result.sessionId = sessionInfo.getSessionId().value(); + result.txnNum = sessionInfo.getTxnNumber().value(); + + auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); + scopedSession->beginTxn(opCtx, result.txnNum); + + BSONObj object(result.isPrePostImage + ? oplogEntry.getObject() + : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); + auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); + oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum); + + writeConflictRetry(opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + sessionInfo, + *oplogEntry.getStatementId(), + oplogLink); + + auto oplogTs = result.oplogTime.getTimestamp(); + uassert(40633, + str::stream() + << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogTs.isNull()); + + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary( + opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs); + } + + wunit.commit(); + }); + + return result; +} +} // unnamed namespace + +const char SessionCatalogMigrationDestination::kSessionMigrateOplogTag[] = "$sessionMigrateInfo"; + +SessionCatalogMigrationDestination::SessionCatalogMigrationDestination( + ShardId fromShard, MigrationSessionId migrationSessionId) + : _fromShard(std::move(fromShard)), _migrationSessionId(std::move(migrationSessionId)) {} + +SessionCatalogMigrationDestination::~SessionCatalogMigrationDestination() { + if (_thread.joinable()) { + _errorOccurred("Destructor cleaning up thread"); + _thread.join(); + } +} + +void SessionCatalogMigrationDestination::start(ServiceContext* service) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_state == State::NotStarted); + _state = State::Migrating; + } + + _thread = stdx::thread(stdx::bind( + &SessionCatalogMigrationDestination::_retrieveSessionStateFromSource, this, service)); +} + +void SessionCatalogMigrationDestination::finish() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _state = State::Committing; +} + +void SessionCatalogMigrationDestination::join() { + invariant(_thread.joinable()); + _thread.join(); +} + +/** + * Outline: + * + * 1. Get oplog with session info from the source shard. + * 2. For each oplog entry, convert to type 'n' if not yet type 'n' while preserving all info + * needed for retryable writes. + * 3. Also update the sessionCatalog for every oplog entry. + * 4. Once the source shard returned an empty oplog buffer, it means that this should enter + * ReadyToCommit state and wait for the commit signal (by calling finish()). + * 5. Once finish() is called, keep on trying to get more oplog from the source shard until it + * returns an empty result again. + * 6. Wait for writes to be committed to majority of the replica set. + */ +void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(ServiceContext* service) { + Client::initThread( + "sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr); + + auto uniqueCtx = cc().makeOperationContext(); + auto opCtx = uniqueCtx.get(); + + // Timestamp prePostImageTs; + ProcessOplogResult lastResult; + + while (true) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_state == State::ErrorOccurred) { + return; + } + } + + try { + auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); + BSONArray oplogArray(nextBatch[kOplogField].Obj()); + BSONArrayIteratorSorted oplogIter(oplogArray); + + if (!oplogIter.more()) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_state == State::Committing) { + break; + } + } + + WriteConcernResult wcResult; + auto wcStatus = + waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult); + if (!wcStatus.isOK()) { + _errorOccurred(wcStatus.toString()); + return; + } + + // We depleted the buffer at least once, transition to ready for commit. + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + // Note: only transition to "ready to commit" if state is not error/force stop. + if (_state == State::Migrating) { + _state = State::ReadyToCommit; + } + } + } + + while (oplogIter.more()) { + lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult); + } + } catch (const DBException& excep) { + if (excep.code() == ErrorCodes::ConflictingOperationInProgress || + excep.code() == ErrorCodes::TransactionTooOld) { + // This means that the server has a newer txnNumber than the oplog being + // migrated, so just skip it. + continue; + } + + _errorOccurred(excep.toString()); + return; + } + } + + WriteConcernResult wcResult; + auto wcStatus = waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult); + if (!wcStatus.isOK()) { + _errorOccurred(wcStatus.toString()); + return; + } + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _state = State::Done; + } +} + +std::string SessionCatalogMigrationDestination::getErrMsg() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _errMsg; +} + +void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _state = State::ErrorOccurred; + _errMsg = errMsg.toString(); +} + +SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _state; +} + +} // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h new file mode 100644 index 00000000000..2e740f2f6bb --- /dev/null +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/s/migration_session_id.h" +#include "mongo/s/shard_id.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/concurrency/with_lock.h" + +namespace mongo { + +class ConnectionString; +class ServiceContext; +class OperationContext; + +/** + * Provides infrastructure for retrieving session information that needs to be migrated from + * the source migration shard. + */ +class SessionCatalogMigrationDestination { + MONGO_DISALLOW_COPYING(SessionCatalogMigrationDestination); + +public: + enum class State { + NotStarted, + Migrating, + ReadyToCommit, + Committing, + ErrorOccurred, + Done, + }; + + static const char kSessionMigrateOplogTag[]; + + SessionCatalogMigrationDestination(ShardId fromShard, MigrationSessionId migrationSessionId); + ~SessionCatalogMigrationDestination(); + + /** + * Spawns a separate thread to initiate the session info transfer to this shard. + */ + void start(ServiceContext* service); + + /** + * Signals to this object that the source shard will no longer create generate new + * session info to transfer. In other words, once the source shard returns an empty + * result for session info to transfer after this call, it is safe for this to stop. + */ + void finish(); + + /** + * Joins the spawned thread called by start(). Should only be called after finish() + * was called. + */ + void join(); + + /** + * Returns the current state. + */ + State getState(); + + /** + * Returns the error message stored. This is only valid when getState() == ErrorOccurred. + */ + std::string getErrMsg(); + +private: + void _retrieveSessionStateFromSource(ServiceContext* service); + + void _errorOccurred(StringData errMsg); + + const ShardId _fromShard; + const MigrationSessionId _migrationSessionId; + + stdx::thread _thread; + + // Protects _state and _errMsg. + stdx::mutex _mutex; + State _state = State::NotStarted; + std::string _errMsg; // valid only if _state == ErrorOccurred. +}; + +} // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp new file mode 100644 index 00000000000..3a75198e456 --- /dev/null +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -0,0 +1,1259 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/connection_string.h" +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_gen.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/session_catalog_migration_destination.h" +#include "mongo/db/server_options.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/transaction_history_iterator.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +namespace { + +using executor::RemoteCommandRequest; +using repl::OplogEntry; +using repl::OpTime; +using repl::OpTypeEnum; +using unittest::assertGet; + +const ConnectionString kConfigConnStr = + ConnectionString::forReplicaSet("config", {HostAndPort("config:1")}); + +const ConnectionString kDonorConnStr = + ConnectionString::forReplicaSet("donor", {HostAndPort("donor:1")}); +const ShardId kFromShard("donor"); + +const BSONObj kSessionOplogTag(BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag + << 1)); + +repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) { + ASSERT_TRUE(oplog.getObject2()); + + auto oplogStatus = repl::OplogEntry::parse(oplog.getObject2().value()); + ASSERT_OK(oplogStatus); + return oplogStatus.getValue(); +} + +class SessionCatalogMigrationDestinationTest : public ShardingMongodTestFixture { +public: + void setUp() override { + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + ShardingMongodTestFixture::setUp(); + + ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr)); + + RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) + ->setConnectionStringReturnValue(kConfigConnStr); + + { + auto donorShard = assertGet( + shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName())); + RemoteCommandTargeterMock::get(donorShard->getTargeter()) + ->setConnectionStringReturnValue(kDonorConnStr); + RemoteCommandTargeterMock::get(donorShard->getTargeter()) + ->setFindHostReturnValue(kDonorConnStr.getServers()[0]); + } + + _migrationId = MigrationSessionId::generate("donor", "recipient"); + + SessionCatalog::create(getServiceContext()); + SessionCatalog::get(getServiceContext())->onStepUp(operationContext()); + } + + void tearDown() override { + SessionCatalog::reset_forTest(getServiceContext()); + ShardingMongodTestFixture::tearDown(); + } + + void returnOplog(const std::vector<OplogEntry>& oplogList) { + onCommand([&oplogList](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { + BSONObjBuilder builder; + BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog")); + + for (const auto& oplog : oplogList) { + arrBuilder.append(oplog.toBSON()); + } + + arrBuilder.doneFast(); + return builder.obj(); + }); + } + + repl::OplogEntry getOplog(OperationContext* opCtx, const Timestamp& ts) { + DBDirectClient client(opCtx); + auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), + BSON(repl::OplogEntryBase::kTimestampFieldName << ts)); + + ASSERT_FALSE(oplogBSON.isEmpty()); + auto parseStatus = repl::OplogEntry::parse(oplogBSON); + ASSERT_OK(parseStatus); + + return parseStatus.getValue(); + } + + MigrationSessionId migrationId() { + return _migrationId.value(); + } + + ScopedSession getSessionWithTxn(OperationContext* opCtx, + const LogicalSessionId& sessionId, + const TxnNumber& txnNum) { + auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId); + scopedSession->beginTxn(opCtx, txnNum); + return scopedSession; + } + + void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) { + _checkOplogExceptO2(originalOplog, oplogToCheck); + ASSERT_BSONOBJ_EQ(*originalOplog.getObject2(), *oplogToCheck.getObject2()); + } + + void checkOplogWithNestedOplog(const repl::OplogEntry& originalOplog, + const repl::OplogEntry& oplogToCheck) { + _checkOplogExceptO2(originalOplog, oplogToCheck); + + auto innerOplog = extractInnerOplog(oplogToCheck); + ASSERT_TRUE(innerOplog.getOpType() == originalOplog.getOpType()); + ASSERT_BSONOBJ_EQ(originalOplog.getObject(), innerOplog.getObject()); + ASSERT_BSONOBJ_EQ(*originalOplog.getObject2(), *innerOplog.getObject2()); + } + +private: + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + + ShardType donorShard; + donorShard.setName(kDonorConnStr.getSetName()); + donorShard.setHost(kDonorConnStr.toString()); + + return repl::OpTimeWith<std::vector<ShardType>>({donorShard}); + } + }; + + return stdx::make_unique<StaticCatalogClient>(); + } + + void _checkOplogExceptO2(const repl::OplogEntry& originalOplog, + const repl::OplogEntry& oplogToCheck) { + ASSERT_TRUE(oplogToCheck.getOpType() == OpTypeEnum::kNoop); + + ASSERT_TRUE(oplogToCheck.getStatementId()); + ASSERT_EQ(*originalOplog.getStatementId(), *oplogToCheck.getStatementId()); + + const auto origSessionInfo = originalOplog.getOperationSessionInfo(); + const auto sessionInfoToCheck = oplogToCheck.getOperationSessionInfo(); + + ASSERT_TRUE(sessionInfoToCheck.getSessionId()); + ASSERT_EQ(*origSessionInfo.getSessionId(), *sessionInfoToCheck.getSessionId()); + + ASSERT_TRUE(sessionInfoToCheck.getTxnNumber()); + ASSERT_EQ(*origSessionInfo.getTxnNumber(), *sessionInfoToCheck.getTxnNumber()); + + ASSERT_BSONOBJ_EQ(kSessionOplogTag, oplogToCheck.getObject()); + } + + boost::optional<MigrationSessionId> _migrationId; +}; + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTransfer) { + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + returnOplog({}); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(45); + + OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); + oplog3.setOperationSessionInfo(sessionInfo); + oplog3.setStatementId(5); + + returnOplog({oplog1, oplog2, oplog3}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + TxnNumber txnNum(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + sessionInfo.setTxnNumber(txnNum++); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + sessionInfo.setTxnNumber(txnNum++); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(45); + + OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); + sessionInfo.setTxnNumber(txnNum); + oplog3.setOperationSessionInfo(sessionInfo); + oplog3.setStatementId(5); + + returnOplog({oplog1, oplog2, oplog3}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, txnNum); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(txnNum)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + ASSERT_FALSE(historyIter.hasNext()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(45); + + OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); + oplog3.setOperationSessionInfo(sessionInfo); + oplog3.setStatementId(5); + + // Return in 2 batches + + returnOplog({oplog1, oplog2}); + returnOplog({oplog3}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) { + const NamespaceString kNs("a.b"); + const auto sessionId1 = makeLogicalSessionIdForTest(); + const auto sessionId2 = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo1; + sessionInfo1.setSessionId(sessionId1); + sessionInfo1.setTxnNumber(2); + + OperationSessionInfo sessionInfo2; + sessionInfo2.setSessionId(sessionId2); + sessionInfo2.setTxnNumber(42); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo1); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(sessionInfo2); + oplog2.setStatementId(45); + + OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); + oplog3.setOperationSessionInfo(sessionInfo2); + oplog3.setStatementId(5); + + returnOplog({oplog1, oplog2, oplog3}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + + { + auto session = getSessionWithTxn(opCtx, sessionId1, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + ASSERT_FALSE(historyIter.hasNext()); + } + + { + auto session = getSessionWithTxn(opCtx, sessionId2, 42); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(42)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); + } +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry origInnerOplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + origInnerOplog1.setOperationSessionInfo(sessionInfo); + origInnerOplog1.setStatementId(23); + + OplogEntry origInnerOplog2( + OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + origInnerOplog2.setOperationSessionInfo(sessionInfo); + origInnerOplog2.setStatementId(45); + + OplogEntry oplog1(OpTime(Timestamp(1100, 2), 1), + 0, + OpTypeEnum::kNoop, + kNs, + 0, + BSONObj(), + origInnerOplog1.toBSON()); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(1080, 2), 1), + 0, + OpTypeEnum::kNoop, + kNs, + 0, + BSONObj(), + origInnerOplog2.toBSON()); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(45); + + returnOplog({oplog1, oplog2}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplog(oplog2, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry preImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog.setOperationSessionInfo(sessionInfo); + preImageOplog.setStatementId(45); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPreImageTs(Timestamp(100, 2)); + + returnOplog({preImageOplog, updateOplog}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + + auto nextOplog = historyIter.next(opCtx); + ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop); + + ASSERT_TRUE(nextOplog.getStatementId()); + ASSERT_EQ(45, nextOplog.getStatementId().value()); + + auto nextSessionInfo = nextOplog.getOperationSessionInfo(); + + ASSERT_TRUE(nextSessionInfo.getSessionId()); + ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value()); + + ASSERT_TRUE(nextSessionInfo.getTxnNumber()); + ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value()); + + ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject()); + + auto innerOplog = extractInnerOplog(nextOplog); + ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate); + ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject()); + ASSERT_TRUE(innerOplog.getObject2()); + ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value()); + + ASSERT_FALSE(historyIter.hasNext()); + + ASSERT_TRUE(nextOplog.getPreImageTs()); + ASSERT_FALSE(nextOplog.getPostImageTs()); + + // Check preImage oplog + + auto preImageTs = nextOplog.getPreImageTs().value(); + auto newPreImageOplog = getOplog(opCtx, preImageTs); + + ASSERT_TRUE(newPreImageOplog.getStatementId()); + ASSERT_EQ(45, newPreImageOplog.getStatementId().value()); + + auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo(); + + ASSERT_TRUE(preImageSessionInfo.getSessionId()); + ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value()); + + ASSERT_TRUE(preImageSessionInfo.getTxnNumber()); + ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value()); + + ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject()); + ASSERT_TRUE(newPreImageOplog.getObject2()); + ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry postImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + postImageOplog.setOperationSessionInfo(sessionInfo); + postImageOplog.setStatementId(45); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPostImageTs(Timestamp(100, 2)); + + returnOplog({postImageOplog, updateOplog}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + + auto nextOplog = historyIter.next(opCtx); + ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop); + + ASSERT_TRUE(nextOplog.getStatementId()); + ASSERT_EQ(45, nextOplog.getStatementId().value()); + + auto nextSessionInfo = nextOplog.getOperationSessionInfo(); + + ASSERT_TRUE(nextSessionInfo.getSessionId()); + ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value()); + + ASSERT_TRUE(nextSessionInfo.getTxnNumber()); + ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value()); + + ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject()); + + auto innerOplog = extractInnerOplog(nextOplog); + ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate); + ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject()); + ASSERT_TRUE(innerOplog.getObject2()); + ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value()); + + ASSERT_FALSE(historyIter.hasNext()); + + ASSERT_FALSE(nextOplog.getPreImageTs()); + ASSERT_TRUE(nextOplog.getPostImageTs()); + + // Check preImage oplog + + auto postImageTs = nextOplog.getPostImageTs().value(); + auto newPostImageOplog = getOplog(opCtx, postImageTs); + + ASSERT_TRUE(newPostImageOplog.getStatementId()); + ASSERT_EQ(45, newPostImageOplog.getStatementId().value()); + + auto preImageSessionInfo = newPostImageOplog.getOperationSessionInfo(); + + ASSERT_TRUE(preImageSessionInfo.getSessionId()); + ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value()); + + ASSERT_TRUE(preImageSessionInfo.getTxnNumber()); + ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value()); + + ASSERT_BSONOBJ_EQ(postImageOplog.getObject(), newPostImageOplog.getObject()); + ASSERT_TRUE(newPostImageOplog.getObject2()); + ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry preImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog.setOperationSessionInfo(sessionInfo); + preImageOplog.setStatementId(45); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPreImageTs(Timestamp(100, 2)); + + returnOplog({preImageOplog}); + returnOplog({updateOplog}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + + auto nextOplog = historyIter.next(opCtx); + ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop); + + ASSERT_TRUE(nextOplog.getStatementId()); + ASSERT_EQ(45, nextOplog.getStatementId().value()); + + auto nextSessionInfo = nextOplog.getOperationSessionInfo(); + + ASSERT_TRUE(nextSessionInfo.getSessionId()); + ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value()); + + ASSERT_TRUE(nextSessionInfo.getTxnNumber()); + ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value()); + + ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject()); + + auto innerOplog = extractInnerOplog(nextOplog); + ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate); + ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject()); + ASSERT_TRUE(innerOplog.getObject2()); + ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value()); + + ASSERT_FALSE(historyIter.hasNext()); + + ASSERT_TRUE(nextOplog.getPreImageTs()); + ASSERT_FALSE(nextOplog.getPostImageTs()); + + // Check preImage oplog + + auto preImageTs = nextOplog.getPreImageTs().value(); + auto newPreImageOplog = getOplog(opCtx, preImageTs); + + ASSERT_TRUE(newPreImageOplog.getStatementId()); + ASSERT_EQ(45, newPreImageOplog.getStatementId().value()); + + auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo(); + + ASSERT_TRUE(preImageSessionInfo.getSessionId()); + ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value()); + + ASSERT_TRUE(preImageSessionInfo.getTxnNumber()); + ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value()); + + ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject()); + ASSERT_TRUE(newPreImageOplog.getObject2()); + ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + auto opCtx = operationContext(); + + { + // Create a new session entry. + auto session = getSessionWithTxn(opCtx, sessionId, 20); + session->beginTxn(opCtx, 20); + + Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); + WriteUnitOfWork wunit(opCtx); + session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3)); + wunit.commit(); + } + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo oldSessionInfo; + oldSessionInfo.setSessionId(sessionId); + oldSessionInfo.setTxnNumber(19); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(oldSessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(oldSessionInfo); + oplog2.setStatementId(45); + + returnOplog({oplog1, oplog2}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto session = getSessionWithTxn(opCtx, sessionId, 20); + ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20)); + + DBDirectClient client(opCtx); + auto oplogBSON = + client.findOne(NamespaceString::kRsOplogNamespace.ns(), + BSON(repl::OplogEntryBase::kNamespaceFieldName << kNs.toString())); + ASSERT_TRUE(oplogBSON.isEmpty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + auto opCtx = operationContext(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo oldSessionInfo; + oldSessionInfo.setSessionId(sessionId); + oldSessionInfo.setTxnNumber(19); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(oldSessionInfo); + oplog1.setStatementId(23); + + returnOplog({oplog1}); + + { + // Create a new session entry. + auto session = getSessionWithTxn(opCtx, sessionId, 20); + session->beginTxn(opCtx, 20); + + Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); + WriteUnitOfWork wunit(opCtx); + session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3)); + wunit.commit(); + } + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(oldSessionInfo); + oplog2.setStatementId(45); + + returnOplog({oplog2}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto session = getSessionWithTxn(opCtx, sessionId, 20); + ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20)); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) { + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { + return {ErrorCodes::SocketException, "Bad connection"}; + }); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoOplog) { + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { + return BSON("oplog" << 1); + }); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithBadOplogFormat) { + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { + BSONObjBuilder builder; + BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog")); + arrBuilder.append(BSON("x" << 1)); + arrBuilder.doneFast(); + return builder.obj(); + }); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoSessionId) { + const NamespaceString kNs("a.b"); + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setTxnNumber(2); + + OplogEntry oplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog.setOperationSessionInfo(sessionInfo); + oplog.setStatementId(23); + + returnOplog({oplog}); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoTxnNumber) { + const NamespaceString kNs("a.b"); + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + + OplogEntry oplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog.setOperationSessionInfo(sessionInfo); + oplog.setStatementId(23); + + returnOplog({oplog}); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoStmtId) { + const NamespaceString kNs("a.b"); + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog.setOperationSessionInfo(sessionInfo); + + returnOplog({oplog}); + + sessionMigration.join(); + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, + NewWritesWithSameTxnDuringMigrationShouldBeCorrectlySet) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + auto opCtx = operationContext(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + returnOplog({oplog1}); + + { + // Create a new session entry. + auto session = getSessionWithTxn(opCtx, sessionId, 2); + session->beginTxn(opCtx, 2); + + Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); + WriteUnitOfWork wunit(opCtx); + session->onWriteOpCompletedOnPrimary(opCtx, 2, {0}, Timestamp(100, 3)); + wunit.commit(); + } + + OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(45); + + returnOplog({oplog2}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry preImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog.setOperationSessionInfo(sessionInfo); + preImageOplog.setStatementId(45); + + OplogEntry preImageOplog2( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog2.setOperationSessionInfo(sessionInfo); + preImageOplog2.setStatementId(45); + + returnOplog({preImageOplog, preImageOplog2}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, + ShouldErrorForPreImageOplogWithNonMatchingSessionId) { + const NamespaceString kNs("a.b"); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + sessionInfo.setTxnNumber(2); + + OplogEntry preImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog.setOperationSessionInfo(sessionInfo); + preImageOplog.setStatementId(45); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPreImageTs(Timestamp(100, 2)); + + returnOplog({preImageOplog, updateOplog}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNonMatchingTxnNum) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry preImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog.setOperationSessionInfo(sessionInfo); + preImageOplog.setStatementId(45); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + sessionInfo.setTxnNumber(56); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPreImageTs(Timestamp(100, 2)); + + returnOplog({preImageOplog, updateOplog}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, + ShouldErrorIfPreImageOplogFollowWithOplogWithNoPreImageLink) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry preImageOplog( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100)); + preImageOplog.setOperationSessionInfo(sessionInfo); + preImageOplog.setStatementId(45); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + + returnOplog({preImageOplog, updateOplog}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, + ShouldErrorIfOplogWithPreImageLinkIsPrecededByNormalOplog) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPreImageTs(Timestamp(100, 2)); + + returnOplog({oplog1, updateOplog}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +TEST_F(SessionCatalogMigrationDestinationTest, + ShouldErrorIfOplogWithPostImageLinkIsPrecededByNormalOplog) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + + sessionMigration.start(getServiceContext()); + + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1), + 0, + OpTypeEnum::kUpdate, + kNs, + 0, + BSON("x" << 100), + BSON("$set" << BSON("x" << 101))); + updateOplog.setOperationSessionInfo(sessionInfo); + updateOplog.setStatementId(45); + updateOplog.setPostImageTs(Timestamp(100, 2)); + + returnOplog({oplog1, updateOplog}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred == + sessionMigration.getState()); + ASSERT_FALSE(sessionMigration.getErrMsg().empty()); +} + +} // namespace + +} // namespace mongo diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 2c217fb7d70..e58b2a5406d 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -60,10 +60,20 @@ void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequ << " collection has been manually deleted.", autoColl.getCollection()); - const auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); + try { + const auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); - if (!updateResult.numDocsModified && updateResult.upserted.isEmpty()) { - throw WriteConflictException(); + if (!updateResult.numDocsModified && updateResult.upserted.isEmpty()) { + throw WriteConflictException(); + } + } catch (const DBException& excep) { + if (excep.code() == ErrorCodes::DuplicateKey) { + // Duplicate key means that another thread already created the session this is trying + // to upsert. Throw WriteCoflict to make it retry and check the current state again. + throw WriteConflictException(); + } + + throw; } } |