summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/logical_session_id.idl2
-rw-r--r--src/mongo/db/s/SConscript14
-rw-r--r--src/mongo/db/s/migration_session_id.cpp4
-rw-r--r--src/mongo/db/s/migration_session_id.h6
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp437
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.h116
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp1259
-rw-r--r--src/mongo/db/session.cpp16
8 files changed, 1849 insertions, 5 deletions
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl
index 96310498441..e7bac0f5165 100644
--- a/src/mongo/db/logical_session_id.idl
+++ b/src/mongo/db/logical_session_id.idl
@@ -40,7 +40,7 @@ types:
description: "A number representing an operation within a transaction."
bson_serialization_type: int
cpp_type: "std::int32_t"
- deserializer: "mongo::BSONElement::_numberLong"
+ deserializer: "mongo::BSONElement::_numberInt"
structs:
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c4726a59b17..9f8c9844ef3 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -75,8 +75,9 @@ env.Library(
'move_timing_helper.cpp',
'namespace_metadata_change_notifications.cpp',
'operation_sharding_state.cpp',
- 'session_catalog_migration_source.cpp',
'read_only_catalog_cache_loader.cpp',
+ 'session_catalog_migration_destination.cpp',
+ 'session_catalog_migration_source.cpp',
'shard_identity_rollback_notifier.cpp',
'shard_metadata_util.cpp',
'shard_server_catalog_cache_loader.cpp',
@@ -343,3 +344,14 @@ env.CppUnitTest(
]
)
+env.CppUnitTest(
+ target='session_catalog_migration_destination_test',
+ source=[
+ 'session_catalog_migration_destination_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
+ '$BUILD_DIR/mongo/s/shard_server_test_fixture',
+ 'sharding',
+ ]
+)
diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp
index 4f617ac5df4..a4e947ae729 100644
--- a/src/mongo/db/s/migration_session_id.cpp
+++ b/src/mongo/db/s/migration_session_id.cpp
@@ -66,6 +66,10 @@ StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj
return status;
}
+MigrationSessionId MigrationSessionId::parseFromBSON(const BSONObj& obj) {
+ return uassertStatusOK(extractFromBSON(obj));
+}
+
MigrationSessionId::MigrationSessionId(std::string sessionId) {
invariant(!sessionId.empty());
_sessionId = std::move(sessionId);
diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h
index 0437fb5246a..e5fbe3eea08 100644
--- a/src/mongo/db/s/migration_session_id.h
+++ b/src/mongo/db/s/migration_session_id.h
@@ -61,6 +61,12 @@ public:
static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj);
/**
+ * Same as extractFromBSON, but throws on error.
+ * Function signature is compatible for idl.
+ */
+ static MigrationSessionId parseFromBSON(const BSONObj& obj);
+
+ /**
* Compares two session identifiers. Two idendifiers match only if they are the same.
*/
bool matches(const MigrationSessionId& other) const;
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
new file mode 100644
index 00000000000..13ad9143457
--- /dev/null
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -0,0 +1,437 @@
+/**
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/session_catalog_migration_destination.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/session_catalog.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+
+const auto kOplogField = "oplog";
+const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Milliseconds(0));
+
+struct ProcessOplogResult {
+ bool isPrePostImage = false;
+ repl::OpTime oplogTime;
+ LogicalSessionId sessionId;
+ TxnNumber txnNum;
+};
+
+/**
+ * Returns the command request to extract session information from the source shard.
+ */
+BSONObj buildMigrateSessionCmd(const MigrationSessionId& migrationSessionId) {
+ BSONObjBuilder builder;
+ builder.append("_getNextSessionMods", 1);
+ migrationSessionId.append(&builder);
+ return builder.obj();
+}
+
+/**
+ * Determines whether the oplog entry has a link to either preImage/postImage and return
+ * a new oplogLink that contains the same link, but pointing to lastResult.oplogTime. For example,
+ * if entry has link to preImageTs, this returns an oplogLink with preImageTs pointing to
+ * lastResult.oplogTime.
+ *
+ * It is an error to have both preImage and postImage as well as not having them at all.
+ */
+repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult,
+ const repl::OplogEntry& entry) {
+ repl::OplogLink oplogLink;
+
+ if (!lastResult.isPrePostImage) {
+ uassert(40628,
+ str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString()
+ << " to not have "
+ << repl::OplogEntryBase::kPreImageTsFieldName
+ << " or "
+ << repl::OplogEntryBase::kPostImageTsFieldName,
+ !entry.getPreImageTs() && !entry.getPostImageTs());
+
+ return oplogLink;
+ }
+
+ const auto ts = lastResult.oplogTime.getTimestamp();
+ invariant(!ts.isNull());
+
+ const auto& sessionInfo = entry.getOperationSessionInfo();
+ const auto sessionId = *sessionInfo.getSessionId();
+ const auto txnNum = *sessionInfo.getTxnNumber();
+
+ uassert(40629,
+ str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": "
+ << redact(entry.toBSON())
+ << " to have session: "
+ << lastResult.sessionId,
+ lastResult.sessionId == sessionId);
+ uassert(40630,
+ str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString() << ": "
+ << redact(entry.toBSON())
+ << " to have txnNumber: "
+ << lastResult.txnNum,
+ lastResult.txnNum == txnNum);
+
+ if (entry.getPreImageTs()) {
+ oplogLink.preImageTs = ts;
+ } else if (entry.getPostImageTs()) {
+ oplogLink.postImageTs = ts;
+ } else {
+ uasserted(40631,
+ str::stream() << "expected oplog with ts: " << entry.getTimestamp().toString()
+ << ": "
+ << redact(entry.toBSON())
+ << " to have either "
+ << repl::OplogEntryBase::kPreImageTsFieldName
+ << " or "
+ << repl::OplogEntryBase::kPostImageTsFieldName);
+ }
+
+ return oplogLink;
+}
+
+/**
+ * Parses the oplog into an oplog entry and makes sure that it contains the expected fields.
+ */
+repl::OplogEntry parseOplog(const BSONObj& oplogBSON) {
+ auto oplogStatus = repl::OplogEntry::parse(oplogBSON);
+ uassertStatusOK(oplogStatus.getStatus());
+
+ auto oplogEntry = oplogStatus.getValue();
+
+ auto sessionInfo = oplogEntry.getOperationSessionInfo();
+
+ uassert(ErrorCodes::UnsupportedFormat,
+ str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString()
+ << " does not have sessionId: "
+ << redact(oplogBSON),
+ sessionInfo.getSessionId());
+
+ uassert(ErrorCodes::UnsupportedFormat,
+ str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString()
+ << " does not have txnNumber: "
+ << redact(oplogBSON),
+ sessionInfo.getTxnNumber());
+
+ uassert(ErrorCodes::UnsupportedFormat,
+ str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString()
+ << " does not have stmtId: "
+ << redact(oplogBSON),
+ oplogEntry.getStatementId());
+
+ uassert(ErrorCodes::UnsupportedFormat,
+ str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString()
+ << " does not have o2: "
+ << redact(oplogBSON),
+ oplogEntry.getObject2());
+
+ return oplogEntry;
+}
+
+/**
+ * Gets the next batch of oplog entries from the source shard.
+ */
+BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
+ const ShardId& fromShard,
+ const MigrationSessionId& migrationSessionId) {
+ auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShard);
+ uassertStatusOK(shardStatus.getStatus());
+
+ auto shard = shardStatus.getValue();
+ auto responseStatus = shard->runCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ buildMigrateSessionCmd(migrationSessionId),
+ Shard::RetryPolicy::kNoRetry);
+
+ uassertStatusOK(responseStatus.getStatus());
+ auto result = responseStatus.getValue().response;
+
+ auto oplogElement = result[kOplogField];
+ uassert(ErrorCodes::FailedToParse,
+ "_migrateSession response does not have the 'oplog' field as array",
+ oplogElement.type() == Array);
+
+ return result;
+}
+
+/**
+ * Insert a new oplog entry by converting the oplogBSON into type 'n' oplog with the session
+ * information. The new oplogEntry will also link to prePostImageTs if not null.
+ */
+ProcessOplogResult processSessionOplog(OperationContext* opCtx,
+ const BSONObj& oplogBSON,
+ // const Timestamp& prePostImageTs,
+ const ProcessOplogResult& lastResult) {
+ ProcessOplogResult result;
+ auto oplogEntry = parseOplog(oplogBSON);
+
+ BSONObj object2;
+ if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) {
+ // Note: Oplog is already no-op type, no need to nest.
+ // There are two types of type 'n' oplog format expected here:
+ // (1) Oplog entries that has been transformed by a previous migration into a
+ // nested oplog. In this case, o field contains {$sessionMigrateInfo: 1}
+ // and o2 field contains the details of the original oplog.
+ // (2) Oplog entries that contains the pre/post-image information of a
+ // findAndModify operation. In this case, o field contains the relevant info
+ // and o2 will be empty.
+
+ object2 = oplogEntry.getObject2().value();
+
+ if (object2.isEmpty()) {
+ result.isPrePostImage = true;
+
+ uassert(40632,
+ str::stream() << "Can't handle 2 pre/post image oplog in a row. Prevoius oplog "
+ << lastResult.oplogTime.getTimestamp().toString()
+ << ", oplog ts: "
+ << oplogEntry.getTimestamp().toString()
+ << ": "
+ << redact(oplogBSON),
+ !lastResult.isPrePostImage);
+ }
+ } else {
+ object2 = oplogBSON; // TODO: strip redundant info?
+ }
+
+ const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
+ result.sessionId = sessionInfo.getSessionId().value();
+ result.txnNum = sessionInfo.getTxnNumber().value();
+
+ auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
+ scopedSession->beginTxn(opCtx, result.txnNum);
+
+ BSONObj object(result.isPrePostImage
+ ? oplogEntry.getObject()
+ : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
+ auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
+ oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum);
+
+ writeConflictRetry(opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ sessionInfo,
+ *oplogEntry.getStatementId(),
+ oplogLink);
+
+ auto oplogTs = result.oplogTime.getTimestamp();
+ uassert(40633,
+ str::stream()
+ << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogTs.isNull());
+
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(
+ opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs);
+ }
+
+ wunit.commit();
+ });
+
+ return result;
+}
+} // unnamed namespace
+
+const char SessionCatalogMigrationDestination::kSessionMigrateOplogTag[] = "$sessionMigrateInfo";
+
+SessionCatalogMigrationDestination::SessionCatalogMigrationDestination(
+ ShardId fromShard, MigrationSessionId migrationSessionId)
+ : _fromShard(std::move(fromShard)), _migrationSessionId(std::move(migrationSessionId)) {}
+
+SessionCatalogMigrationDestination::~SessionCatalogMigrationDestination() {
+ if (_thread.joinable()) {
+ _errorOccurred("Destructor cleaning up thread");
+ _thread.join();
+ }
+}
+
+void SessionCatalogMigrationDestination::start(ServiceContext* service) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_state == State::NotStarted);
+ _state = State::Migrating;
+ }
+
+ _thread = stdx::thread(stdx::bind(
+ &SessionCatalogMigrationDestination::_retrieveSessionStateFromSource, this, service));
+}
+
+void SessionCatalogMigrationDestination::finish() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _state = State::Committing;
+}
+
+void SessionCatalogMigrationDestination::join() {
+ invariant(_thread.joinable());
+ _thread.join();
+}
+
+/**
+ * Outline:
+ *
+ * 1. Get oplog with session info from the source shard.
+ * 2. For each oplog entry, convert to type 'n' if not yet type 'n' while preserving all info
+ * needed for retryable writes.
+ * 3. Also update the sessionCatalog for every oplog entry.
+ * 4. Once the source shard returned an empty oplog buffer, it means that this should enter
+ * ReadyToCommit state and wait for the commit signal (by calling finish()).
+ * 5. Once finish() is called, keep on trying to get more oplog from the source shard until it
+ * returns an empty result again.
+ * 6. Wait for writes to be committed to majority of the replica set.
+ */
+void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(ServiceContext* service) {
+ Client::initThread(
+ "sessionCatalogMigration-" + _migrationSessionId.toString(), service, nullptr);
+
+ auto uniqueCtx = cc().makeOperationContext();
+ auto opCtx = uniqueCtx.get();
+
+ // Timestamp prePostImageTs;
+ ProcessOplogResult lastResult;
+
+ while (true) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::ErrorOccurred) {
+ return;
+ }
+ }
+
+ try {
+ auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
+ BSONArray oplogArray(nextBatch[kOplogField].Obj());
+ BSONArrayIteratorSorted oplogIter(oplogArray);
+
+ if (!oplogIter.more()) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::Committing) {
+ break;
+ }
+ }
+
+ WriteConcernResult wcResult;
+ auto wcStatus =
+ waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult);
+ if (!wcStatus.isOK()) {
+ _errorOccurred(wcStatus.toString());
+ return;
+ }
+
+ // We depleted the buffer at least once, transition to ready for commit.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Note: only transition to "ready to commit" if state is not error/force stop.
+ if (_state == State::Migrating) {
+ _state = State::ReadyToCommit;
+ }
+ }
+ }
+
+ while (oplogIter.more()) {
+ lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult);
+ }
+ } catch (const DBException& excep) {
+ if (excep.code() == ErrorCodes::ConflictingOperationInProgress ||
+ excep.code() == ErrorCodes::TransactionTooOld) {
+ // This means that the server has a newer txnNumber than the oplog being
+ // migrated, so just skip it.
+ continue;
+ }
+
+ _errorOccurred(excep.toString());
+ return;
+ }
+ }
+
+ WriteConcernResult wcResult;
+ auto wcStatus = waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult);
+ if (!wcStatus.isOK()) {
+ _errorOccurred(wcStatus.toString());
+ return;
+ }
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _state = State::Done;
+ }
+}
+
+std::string SessionCatalogMigrationDestination::getErrMsg() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _errMsg;
+}
+
+void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _state = State::ErrorOccurred;
+ _errMsg = errMsg.toString();
+}
+
+SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _state;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h
new file mode 100644
index 00000000000..2e740f2f6bb
--- /dev/null
+++ b/src/mongo/db/s/session_catalog_migration_destination.h
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/s/migration_session_id.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/concurrency/with_lock.h"
+
+namespace mongo {
+
+class ConnectionString;
+class ServiceContext;
+class OperationContext;
+
+/**
+ * Provides infrastructure for retrieving session information that needs to be migrated from
+ * the source migration shard.
+ */
+class SessionCatalogMigrationDestination {
+ MONGO_DISALLOW_COPYING(SessionCatalogMigrationDestination);
+
+public:
+ enum class State {
+ NotStarted,
+ Migrating,
+ ReadyToCommit,
+ Committing,
+ ErrorOccurred,
+ Done,
+ };
+
+ static const char kSessionMigrateOplogTag[];
+
+ SessionCatalogMigrationDestination(ShardId fromShard, MigrationSessionId migrationSessionId);
+ ~SessionCatalogMigrationDestination();
+
+ /**
+ * Spawns a separate thread to initiate the session info transfer to this shard.
+ */
+ void start(ServiceContext* service);
+
+ /**
+ * Signals to this object that the source shard will no longer create generate new
+ * session info to transfer. In other words, once the source shard returns an empty
+ * result for session info to transfer after this call, it is safe for this to stop.
+ */
+ void finish();
+
+ /**
+ * Joins the spawned thread called by start(). Should only be called after finish()
+ * was called.
+ */
+ void join();
+
+ /**
+ * Returns the current state.
+ */
+ State getState();
+
+ /**
+ * Returns the error message stored. This is only valid when getState() == ErrorOccurred.
+ */
+ std::string getErrMsg();
+
+private:
+ void _retrieveSessionStateFromSource(ServiceContext* service);
+
+ void _errorOccurred(StringData errMsg);
+
+ const ShardId _fromShard;
+ const MigrationSessionId _migrationSessionId;
+
+ stdx::thread _thread;
+
+ // Protects _state and _errMsg.
+ stdx::mutex _mutex;
+ State _state = State::NotStarted;
+ std::string _errMsg; // valid only if _state == ErrorOccurred.
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
new file mode 100644
index 00000000000..3a75198e456
--- /dev/null
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -0,0 +1,1259 @@
+/**
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/connection_string.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_id_gen.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_destination.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+namespace {
+
+using executor::RemoteCommandRequest;
+using repl::OplogEntry;
+using repl::OpTime;
+using repl::OpTypeEnum;
+using unittest::assertGet;
+
+const ConnectionString kConfigConnStr =
+ ConnectionString::forReplicaSet("config", {HostAndPort("config:1")});
+
+const ConnectionString kDonorConnStr =
+ ConnectionString::forReplicaSet("donor", {HostAndPort("donor:1")});
+const ShardId kFromShard("donor");
+
+const BSONObj kSessionOplogTag(BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag
+ << 1));
+
+repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) {
+ ASSERT_TRUE(oplog.getObject2());
+
+ auto oplogStatus = repl::OplogEntry::parse(oplog.getObject2().value());
+ ASSERT_OK(oplogStatus);
+ return oplogStatus.getValue();
+}
+
+class SessionCatalogMigrationDestinationTest : public ShardingMongodTestFixture {
+public:
+ void setUp() override {
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ ShardingMongodTestFixture::setUp();
+
+ ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr));
+
+ RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter())
+ ->setConnectionStringReturnValue(kConfigConnStr);
+
+ {
+ auto donorShard = assertGet(
+ shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName()));
+ RemoteCommandTargeterMock::get(donorShard->getTargeter())
+ ->setConnectionStringReturnValue(kDonorConnStr);
+ RemoteCommandTargeterMock::get(donorShard->getTargeter())
+ ->setFindHostReturnValue(kDonorConnStr.getServers()[0]);
+ }
+
+ _migrationId = MigrationSessionId::generate("donor", "recipient");
+
+ SessionCatalog::create(getServiceContext());
+ SessionCatalog::get(getServiceContext())->onStepUp(operationContext());
+ }
+
+ void tearDown() override {
+ SessionCatalog::reset_forTest(getServiceContext());
+ ShardingMongodTestFixture::tearDown();
+ }
+
+ void returnOplog(const std::vector<OplogEntry>& oplogList) {
+ onCommand([&oplogList](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ BSONObjBuilder builder;
+ BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog"));
+
+ for (const auto& oplog : oplogList) {
+ arrBuilder.append(oplog.toBSON());
+ }
+
+ arrBuilder.doneFast();
+ return builder.obj();
+ });
+ }
+
+ repl::OplogEntry getOplog(OperationContext* opCtx, const Timestamp& ts) {
+ DBDirectClient client(opCtx);
+ auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(),
+ BSON(repl::OplogEntryBase::kTimestampFieldName << ts));
+
+ ASSERT_FALSE(oplogBSON.isEmpty());
+ auto parseStatus = repl::OplogEntry::parse(oplogBSON);
+ ASSERT_OK(parseStatus);
+
+ return parseStatus.getValue();
+ }
+
+ MigrationSessionId migrationId() {
+ return _migrationId.value();
+ }
+
+ ScopedSession getSessionWithTxn(OperationContext* opCtx,
+ const LogicalSessionId& sessionId,
+ const TxnNumber& txnNum) {
+ auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
+ scopedSession->beginTxn(opCtx, txnNum);
+ return scopedSession;
+ }
+
+ void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) {
+ _checkOplogExceptO2(originalOplog, oplogToCheck);
+ ASSERT_BSONOBJ_EQ(*originalOplog.getObject2(), *oplogToCheck.getObject2());
+ }
+
+ void checkOplogWithNestedOplog(const repl::OplogEntry& originalOplog,
+ const repl::OplogEntry& oplogToCheck) {
+ _checkOplogExceptO2(originalOplog, oplogToCheck);
+
+ auto innerOplog = extractInnerOplog(oplogToCheck);
+ ASSERT_TRUE(innerOplog.getOpType() == originalOplog.getOpType());
+ ASSERT_BSONOBJ_EQ(originalOplog.getObject(), innerOplog.getObject());
+ ASSERT_BSONOBJ_EQ(*originalOplog.getObject2(), *innerOplog.getObject2());
+ }
+
+private:
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+
+ ShardType donorShard;
+ donorShard.setName(kDonorConnStr.getSetName());
+ donorShard.setHost(kDonorConnStr.toString());
+
+ return repl::OpTimeWith<std::vector<ShardType>>({donorShard});
+ }
+ };
+
+ return stdx::make_unique<StaticCatalogClient>();
+ }
+
+ void _checkOplogExceptO2(const repl::OplogEntry& originalOplog,
+ const repl::OplogEntry& oplogToCheck) {
+ ASSERT_TRUE(oplogToCheck.getOpType() == OpTypeEnum::kNoop);
+
+ ASSERT_TRUE(oplogToCheck.getStatementId());
+ ASSERT_EQ(*originalOplog.getStatementId(), *oplogToCheck.getStatementId());
+
+ const auto origSessionInfo = originalOplog.getOperationSessionInfo();
+ const auto sessionInfoToCheck = oplogToCheck.getOperationSessionInfo();
+
+ ASSERT_TRUE(sessionInfoToCheck.getSessionId());
+ ASSERT_EQ(*origSessionInfo.getSessionId(), *sessionInfoToCheck.getSessionId());
+
+ ASSERT_TRUE(sessionInfoToCheck.getTxnNumber());
+ ASSERT_EQ(*origSessionInfo.getTxnNumber(), *sessionInfoToCheck.getTxnNumber());
+
+ ASSERT_BSONOBJ_EQ(kSessionOplogTag, oplogToCheck.getObject());
+ }
+
+ boost::optional<MigrationSessionId> _migrationId;
+};
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTransfer) {
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ returnOplog({});
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(45);
+
+ OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(5);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ TxnNumber txnNum(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ sessionInfo.setTxnNumber(txnNum++);
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ sessionInfo.setTxnNumber(txnNum++);
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(45);
+
+ OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
+ sessionInfo.setTxnNumber(txnNum);
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(5);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(txnNum));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+ ASSERT_FALSE(historyIter.hasNext());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(45);
+
+ OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(5);
+
+ // Return in 2 batches
+
+ returnOplog({oplog1, oplog2});
+ returnOplog({oplog3});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId1 = makeLogicalSessionIdForTest();
+ const auto sessionId2 = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo1;
+ sessionInfo1.setSessionId(sessionId1);
+ sessionInfo1.setTxnNumber(2);
+
+ OperationSessionInfo sessionInfo2;
+ sessionInfo2.setSessionId(sessionId2);
+ sessionInfo2.setTxnNumber(42);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo1);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo2);
+ oplog2.setStatementId(45);
+
+ OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
+ oplog3.setOperationSessionInfo(sessionInfo2);
+ oplog3.setStatementId(5);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+
+ {
+ auto session = getSessionWithTxn(opCtx, sessionId1, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+ ASSERT_FALSE(historyIter.hasNext());
+ }
+
+ {
+ auto session = getSessionWithTxn(opCtx, sessionId2, 42);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(42));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+ }
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry origInnerOplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ origInnerOplog1.setOperationSessionInfo(sessionInfo);
+ origInnerOplog1.setStatementId(23);
+
+ OplogEntry origInnerOplog2(
+ OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ origInnerOplog2.setOperationSessionInfo(sessionInfo);
+ origInnerOplog2.setStatementId(45);
+
+ OplogEntry oplog1(OpTime(Timestamp(1100, 2), 1),
+ 0,
+ OpTypeEnum::kNoop,
+ kNs,
+ 0,
+ BSONObj(),
+ origInnerOplog1.toBSON());
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(1080, 2), 1),
+ 0,
+ OpTypeEnum::kNoop,
+ kNs,
+ 0,
+ BSONObj(),
+ origInnerOplog2.toBSON());
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(45);
+
+ returnOplog({oplog1, oplog2});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplog(oplog2, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry preImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog.setOperationSessionInfo(sessionInfo);
+ preImageOplog.setStatementId(45);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPreImageTs(Timestamp(100, 2));
+
+ returnOplog({preImageOplog, updateOplog});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+
+ auto nextOplog = historyIter.next(opCtx);
+ ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
+
+ ASSERT_TRUE(nextOplog.getStatementId());
+ ASSERT_EQ(45, nextOplog.getStatementId().value());
+
+ auto nextSessionInfo = nextOplog.getOperationSessionInfo();
+
+ ASSERT_TRUE(nextSessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value());
+
+ ASSERT_TRUE(nextSessionInfo.getTxnNumber());
+ ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value());
+
+ ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject());
+
+ auto innerOplog = extractInnerOplog(nextOplog);
+ ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate);
+ ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject());
+ ASSERT_TRUE(innerOplog.getObject2());
+ ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value());
+
+ ASSERT_FALSE(historyIter.hasNext());
+
+ ASSERT_TRUE(nextOplog.getPreImageTs());
+ ASSERT_FALSE(nextOplog.getPostImageTs());
+
+ // Check preImage oplog
+
+ auto preImageTs = nextOplog.getPreImageTs().value();
+ auto newPreImageOplog = getOplog(opCtx, preImageTs);
+
+ ASSERT_TRUE(newPreImageOplog.getStatementId());
+ ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
+
+ auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo();
+
+ ASSERT_TRUE(preImageSessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value());
+
+ ASSERT_TRUE(preImageSessionInfo.getTxnNumber());
+ ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value());
+
+ ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject());
+ ASSERT_TRUE(newPreImageOplog.getObject2());
+ ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry postImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ postImageOplog.setOperationSessionInfo(sessionInfo);
+ postImageOplog.setStatementId(45);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPostImageTs(Timestamp(100, 2));
+
+ returnOplog({postImageOplog, updateOplog});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+
+ auto nextOplog = historyIter.next(opCtx);
+ ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
+
+ ASSERT_TRUE(nextOplog.getStatementId());
+ ASSERT_EQ(45, nextOplog.getStatementId().value());
+
+ auto nextSessionInfo = nextOplog.getOperationSessionInfo();
+
+ ASSERT_TRUE(nextSessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value());
+
+ ASSERT_TRUE(nextSessionInfo.getTxnNumber());
+ ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value());
+
+ ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject());
+
+ auto innerOplog = extractInnerOplog(nextOplog);
+ ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate);
+ ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject());
+ ASSERT_TRUE(innerOplog.getObject2());
+ ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value());
+
+ ASSERT_FALSE(historyIter.hasNext());
+
+ ASSERT_FALSE(nextOplog.getPreImageTs());
+ ASSERT_TRUE(nextOplog.getPostImageTs());
+
+ // Check preImage oplog
+
+ auto postImageTs = nextOplog.getPostImageTs().value();
+ auto newPostImageOplog = getOplog(opCtx, postImageTs);
+
+ ASSERT_TRUE(newPostImageOplog.getStatementId());
+ ASSERT_EQ(45, newPostImageOplog.getStatementId().value());
+
+ auto preImageSessionInfo = newPostImageOplog.getOperationSessionInfo();
+
+ ASSERT_TRUE(preImageSessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value());
+
+ ASSERT_TRUE(preImageSessionInfo.getTxnNumber());
+ ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value());
+
+ ASSERT_BSONOBJ_EQ(postImageOplog.getObject(), newPostImageOplog.getObject());
+ ASSERT_TRUE(newPostImageOplog.getObject2());
+ ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry preImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog.setOperationSessionInfo(sessionInfo);
+ preImageOplog.setStatementId(45);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPreImageTs(Timestamp(100, 2));
+
+ returnOplog({preImageOplog});
+ returnOplog({updateOplog});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+
+ auto nextOplog = historyIter.next(opCtx);
+ ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
+
+ ASSERT_TRUE(nextOplog.getStatementId());
+ ASSERT_EQ(45, nextOplog.getStatementId().value());
+
+ auto nextSessionInfo = nextOplog.getOperationSessionInfo();
+
+ ASSERT_TRUE(nextSessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value());
+
+ ASSERT_TRUE(nextSessionInfo.getTxnNumber());
+ ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value());
+
+ ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject());
+
+ auto innerOplog = extractInnerOplog(nextOplog);
+ ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate);
+ ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject());
+ ASSERT_TRUE(innerOplog.getObject2());
+ ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value());
+
+ ASSERT_FALSE(historyIter.hasNext());
+
+ ASSERT_TRUE(nextOplog.getPreImageTs());
+ ASSERT_FALSE(nextOplog.getPostImageTs());
+
+ // Check preImage oplog
+
+ auto preImageTs = nextOplog.getPreImageTs().value();
+ auto newPreImageOplog = getOplog(opCtx, preImageTs);
+
+ ASSERT_TRUE(newPreImageOplog.getStatementId());
+ ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
+
+ auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo();
+
+ ASSERT_TRUE(preImageSessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value());
+
+ ASSERT_TRUE(preImageSessionInfo.getTxnNumber());
+ ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value());
+
+ ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject());
+ ASSERT_TRUE(newPreImageOplog.getObject2());
+ ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ auto opCtx = operationContext();
+
+ {
+ // Create a new session entry.
+ auto session = getSessionWithTxn(opCtx, sessionId, 20);
+ session->beginTxn(opCtx, 20);
+
+ Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
+ WriteUnitOfWork wunit(opCtx);
+ session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3));
+ wunit.commit();
+ }
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo oldSessionInfo;
+ oldSessionInfo.setSessionId(sessionId);
+ oldSessionInfo.setTxnNumber(19);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(oldSessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(oldSessionInfo);
+ oplog2.setStatementId(45);
+
+ returnOplog({oplog1, oplog2});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto session = getSessionWithTxn(opCtx, sessionId, 20);
+ ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20));
+
+ DBDirectClient client(opCtx);
+ auto oplogBSON =
+ client.findOne(NamespaceString::kRsOplogNamespace.ns(),
+ BSON(repl::OplogEntryBase::kNamespaceFieldName << kNs.toString()));
+ ASSERT_TRUE(oplogBSON.isEmpty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ auto opCtx = operationContext();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo oldSessionInfo;
+ oldSessionInfo.setSessionId(sessionId);
+ oldSessionInfo.setTxnNumber(19);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(oldSessionInfo);
+ oplog1.setStatementId(23);
+
+ returnOplog({oplog1});
+
+ {
+ // Create a new session entry.
+ auto session = getSessionWithTxn(opCtx, sessionId, 20);
+ session->beginTxn(opCtx, 20);
+
+ Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
+ WriteUnitOfWork wunit(opCtx);
+ session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3));
+ wunit.commit();
+ }
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(oldSessionInfo);
+ oplog2.setStatementId(45);
+
+ returnOplog({oplog2});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto session = getSessionWithTxn(opCtx, sessionId, 20);
+ ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20));
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ return {ErrorCodes::SocketException, "Bad connection"};
+ });
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoOplog) {
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ return BSON("oplog" << 1);
+ });
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithBadOplogFormat) {
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ BSONObjBuilder builder;
+ BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog"));
+ arrBuilder.append(BSON("x" << 1));
+ arrBuilder.doneFast();
+ return builder.obj();
+ });
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoSessionId) {
+ const NamespaceString kNs("a.b");
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog.setOperationSessionInfo(sessionInfo);
+ oplog.setStatementId(23);
+
+ returnOplog({oplog});
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoTxnNumber) {
+ const NamespaceString kNs("a.b");
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+
+ OplogEntry oplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog.setOperationSessionInfo(sessionInfo);
+ oplog.setStatementId(23);
+
+ returnOplog({oplog});
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoStmtId) {
+ const NamespaceString kNs("a.b");
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog.setOperationSessionInfo(sessionInfo);
+
+ returnOplog({oplog});
+
+ sessionMigration.join();
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest,
+ NewWritesWithSameTxnDuringMigrationShouldBeCorrectlySet) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+ auto opCtx = operationContext();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ returnOplog({oplog1});
+
+ {
+ // Create a new session entry.
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ session->beginTxn(opCtx, 2);
+
+ Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
+ WriteUnitOfWork wunit(opCtx);
+ session->onWriteOpCompletedOnPrimary(opCtx, 2, {0}, Timestamp(100, 3));
+ wunit.commit();
+ }
+
+ OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(45);
+
+ returnOplog({oplog2});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry preImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog.setOperationSessionInfo(sessionInfo);
+ preImageOplog.setStatementId(45);
+
+ OplogEntry preImageOplog2(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog2.setOperationSessionInfo(sessionInfo);
+ preImageOplog2.setStatementId(45);
+
+ returnOplog({preImageOplog, preImageOplog2});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest,
+ ShouldErrorForPreImageOplogWithNonMatchingSessionId) {
+ const NamespaceString kNs("a.b");
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry preImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog.setOperationSessionInfo(sessionInfo);
+ preImageOplog.setStatementId(45);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPreImageTs(Timestamp(100, 2));
+
+ returnOplog({preImageOplog, updateOplog});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNonMatchingTxnNum) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry preImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog.setOperationSessionInfo(sessionInfo);
+ preImageOplog.setStatementId(45);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ sessionInfo.setTxnNumber(56);
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPreImageTs(Timestamp(100, 2));
+
+ returnOplog({preImageOplog, updateOplog});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest,
+ ShouldErrorIfPreImageOplogFollowWithOplogWithNoPreImageLink) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry preImageOplog(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
+ preImageOplog.setOperationSessionInfo(sessionInfo);
+ preImageOplog.setStatementId(45);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+
+ returnOplog({preImageOplog, updateOplog});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest,
+ ShouldErrorIfOplogWithPreImageLinkIsPrecededByNormalOplog) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPreImageTs(Timestamp(100, 2));
+
+ returnOplog({oplog1, updateOplog});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest,
+ ShouldErrorIfOplogWithPostImageLinkIsPrecededByNormalOplog) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+
+ sessionMigration.start(getServiceContext());
+
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
+ 0,
+ OpTypeEnum::kUpdate,
+ kNs,
+ 0,
+ BSON("x" << 100),
+ BSON("$set" << BSON("x" << 101)));
+ updateOplog.setOperationSessionInfo(sessionInfo);
+ updateOplog.setStatementId(45);
+ updateOplog.setPostImageTs(Timestamp(100, 2));
+
+ returnOplog({oplog1, updateOplog});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
+ sessionMigration.getState());
+ ASSERT_FALSE(sessionMigration.getErrMsg().empty());
+}
+
+} // namespace
+
+} // namespace mongo
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 2c217fb7d70..e58b2a5406d 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -60,10 +60,20 @@ void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequ
<< " collection has been manually deleted.",
autoColl.getCollection());
- const auto updateResult = update(opCtx, autoColl.getDb(), updateRequest);
+ try {
+ const auto updateResult = update(opCtx, autoColl.getDb(), updateRequest);
- if (!updateResult.numDocsModified && updateResult.upserted.isEmpty()) {
- throw WriteConflictException();
+ if (!updateResult.numDocsModified && updateResult.upserted.isEmpty()) {
+ throw WriteConflictException();
+ }
+ } catch (const DBException& excep) {
+ if (excep.code() == ErrorCodes::DuplicateKey) {
+ // Duplicate key means that another thread already created the session this is trying
+ // to upsert. Throw WriteCoflict to make it retry and check the current state again.
+ throw WriteConflictException();
+ }
+
+ throw;
}
}