/**
* Copyright (C) 2017 MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_gen.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_destination.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_mongod_test_fixture.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
using executor::RemoteCommandRequest;
using repl::OplogEntry;
using repl::OpTime;
using repl::OpTypeEnum;
using unittest::assertGet;
const ConnectionString kConfigConnStr =
ConnectionString::forReplicaSet("config", {HostAndPort("config:1")});
const ConnectionString kDonorConnStr =
ConnectionString::forReplicaSet("donor", {HostAndPort("donor:1")});
const ShardId kFromShard("donor");
const BSONObj kSessionOplogTag(BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag
<< 1));
repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) {
ASSERT_TRUE(oplog.getObject2());
auto oplogStatus = repl::OplogEntry::parse(oplog.getObject2().value());
ASSERT_OK(oplogStatus);
return oplogStatus.getValue();
}
class SessionCatalogMigrationDestinationTest : public ShardingMongodTestFixture {
public:
void setUp() override {
serverGlobalParams.featureCompatibility.setVersion(
ServerGlobalParams::FeatureCompatibility::Version::k36);
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
ShardingMongodTestFixture::setUp();
ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr));
RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter())
->setConnectionStringReturnValue(kConfigConnStr);
{
auto donorShard = assertGet(
shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName()));
RemoteCommandTargeterMock::get(donorShard->getTargeter())
->setConnectionStringReturnValue(kDonorConnStr);
RemoteCommandTargeterMock::get(donorShard->getTargeter())
->setFindHostReturnValue(kDonorConnStr.getServers()[0]);
}
_migrationId = MigrationSessionId::generate("donor", "recipient");
SessionCatalog::create(getServiceContext());
SessionCatalog::get(getServiceContext())->onStepUp(operationContext());
LogicalSessionCache::set(getServiceContext(), stdx::make_unique());
}
void tearDown() override {
SessionCatalog::reset_forTest(getServiceContext());
ShardingMongodTestFixture::tearDown();
}
void returnOplog(const std::vector& oplogList) {
onCommand([&oplogList](const RemoteCommandRequest& request) -> StatusWith {
BSONObjBuilder builder;
BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog"));
for (const auto& oplog : oplogList) {
arrBuilder.append(oplog.toBSON());
}
arrBuilder.doneFast();
return builder.obj();
});
}
repl::OplogEntry getOplog(OperationContext* opCtx, const repl::OpTime& opTime) {
DBDirectClient client(opCtx);
auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery());
ASSERT_FALSE(oplogBSON.isEmpty());
auto parseStatus = repl::OplogEntry::parse(oplogBSON);
ASSERT_OK(parseStatus);
return parseStatus.getValue();
}
MigrationSessionId migrationId() {
return _migrationId.value();
}
ScopedSession getSessionWithTxn(OperationContext* opCtx,
const LogicalSessionId& sessionId,
const TxnNumber& txnNum) {
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
scopedSession->beginTxn(opCtx, txnNum);
return scopedSession;
}
void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) {
_checkOplogExceptO2(originalOplog, oplogToCheck);
ASSERT_BSONOBJ_EQ(*originalOplog.getObject2(), *oplogToCheck.getObject2());
}
void checkOplogWithNestedOplog(const repl::OplogEntry& originalOplog,
const repl::OplogEntry& oplogToCheck) {
_checkOplogExceptO2(originalOplog, oplogToCheck);
auto innerOplog = extractInnerOplog(oplogToCheck);
ASSERT_TRUE(innerOplog.getOpType() == originalOplog.getOpType());
ASSERT_BSONOBJ_EQ(originalOplog.getObject(), innerOplog.getObject());
if (originalOplog.getObject2()) {
ASSERT_TRUE(innerOplog.getObject2());
ASSERT_BSONOBJ_EQ(*originalOplog.getObject2(), *innerOplog.getObject2());
} else {
ASSERT_FALSE(innerOplog.getObject2());
}
}
void checkStatementExecuted(OperationContext* opCtx,
Session* session,
TxnNumber txnNumber,
StmtId stmtId) {
auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
ASSERT_TRUE(oplog);
}
void checkStatementExecuted(OperationContext* opCtx,
Session* session,
TxnNumber txnNumber,
StmtId stmtId,
repl::OplogEntry& expectedOplog) {
auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
ASSERT_TRUE(oplog);
checkOplogWithNestedOplog(expectedOplog, *oplog);
}
void insertDocWithSessionInfo(const OperationSessionInfo& sessionInfo,
const NamespaceString& ns,
const BSONObj& doc,
StmtId stmtId) {
// Do write on a separate thread in order not to pollute this thread's opCtx.
stdx::thread insertThread([sessionInfo, ns, doc, stmtId] {
write_ops::WriteCommandBase cmdBase;
std::vector stmtIds;
stmtIds.push_back(stmtId);
cmdBase.setStmtIds(stmtIds);
write_ops::Insert insertRequest(ns);
std::vector documents;
documents.push_back(doc);
insertRequest.setDocuments(documents);
insertRequest.setWriteCommandBase(cmdBase);
BSONObjBuilder insertBuilder;
insertRequest.serialize({}, &insertBuilder);
sessionInfo.serialize(&insertBuilder);
Client::initThread("test insert thread");
auto innerOpCtx = Client::getCurrent()->makeOperationContext();
DBDirectClient client(innerOpCtx.get());
BSONObj result;
ASSERT_TRUE(client.runCommand(ns.db().toString(), insertBuilder.obj(), result));
});
insertThread.join();
}
void finishSessionExpectSuccess(SessionCatalogMigrationDestination* sessionMigration) {
sessionMigration->finish();
// migration always fetches at least twice to transition from committing to done.
returnOplog({});
returnOplog({});
sessionMigration->join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done ==
sessionMigration->getState());
}
private:
std::unique_ptr makeShardingCatalogClient(
std::unique_ptr distLockManager) override {
class StaticCatalogClient final : public ShardingCatalogClientMock {
public:
StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
StatusWith>> getAllShards(
OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
ShardType donorShard;
donorShard.setName(kDonorConnStr.getSetName());
donorShard.setHost(kDonorConnStr.toString());
return repl::OpTimeWith>({donorShard});
}
};
return stdx::make_unique();
}
void _checkOplogExceptO2(const repl::OplogEntry& originalOplog,
const repl::OplogEntry& oplogToCheck) {
ASSERT_TRUE(oplogToCheck.getOpType() == OpTypeEnum::kNoop);
ASSERT_TRUE(oplogToCheck.getStatementId());
ASSERT_EQ(*originalOplog.getStatementId(), *oplogToCheck.getStatementId());
const auto origSessionInfo = originalOplog.getOperationSessionInfo();
const auto sessionInfoToCheck = oplogToCheck.getOperationSessionInfo();
ASSERT_TRUE(sessionInfoToCheck.getSessionId());
ASSERT_EQ(*origSessionInfo.getSessionId(), *sessionInfoToCheck.getSessionId());
ASSERT_TRUE(sessionInfoToCheck.getTxnNumber());
ASSERT_EQ(*origSessionInfo.getTxnNumber(), *sessionInfoToCheck.getTxnNumber());
ASSERT_BSONOBJ_EQ(kSessionOplogTag, oplogToCheck.getObject());
}
boost::optional _migrationId;
};
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTransfer) {
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
finishSessionExpectSuccess(&sessionMigration);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
TxnNumber txnNum(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
sessionInfo.setTxnNumber(txnNum++);
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
sessionInfo.setTxnNumber(txnNum++);
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
sessionInfo.setTxnNumber(txnNum);
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(txnNum));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), txnNum, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
// Return in 2 batches
returnOplog({oplog1, oplog2});
returnOplog({oplog3});
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) {
const NamespaceString kNs("a.b");
const auto sessionId1 = makeLogicalSessionIdForTest();
const auto sessionId2 = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo1;
sessionInfo1.setSessionId(sessionId1);
sessionInfo1.setTxnNumber(2);
OperationSessionInfo sessionInfo2;
sessionInfo2.setSessionId(sessionId2);
sessionInfo2.setTxnNumber(42);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo1);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo2);
oplog2.setStatementId(45);
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo2);
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
finishSessionExpectSuccess(&sessionMigration);
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
{
auto session = getSessionWithTxn(opCtx, sessionId1, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
}
{
auto session = getSessionWithTxn(opCtx, sessionId2, 42);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(42));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 42, 45, oplog2);
checkStatementExecuted(opCtx, session.get(), 42, 5, oplog3);
}
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry origInnerOplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
origInnerOplog1.setOperationSessionInfo(sessionInfo);
origInnerOplog1.setStatementId(23);
OplogEntry origInnerOplog2(
OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
origInnerOplog2.setOperationSessionInfo(sessionInfo);
origInnerOplog2.setStatementId(45);
OplogEntry oplog1(OpTime(Timestamp(1100, 2), 1),
0,
OpTypeEnum::kNoop,
kNs,
0,
BSONObj(),
origInnerOplog1.toBSON());
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(1080, 2), 1),
0,
OpTypeEnum::kNoop,
kNs,
0,
BSONObj(),
origInnerOplog2.toBSON());
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
returnOplog({oplog1, oplog2});
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplog(oplog2, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 2, 23);
checkStatementExecuted(opCtx, session.get(), 2, 45);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry preImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
ASSERT_TRUE(nextOplog.getStatementId());
ASSERT_EQ(45, nextOplog.getStatementId().value());
auto nextSessionInfo = nextOplog.getOperationSessionInfo();
ASSERT_TRUE(nextSessionInfo.getSessionId());
ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value());
ASSERT_TRUE(nextSessionInfo.getTxnNumber());
ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value());
ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject());
auto innerOplog = extractInnerOplog(nextOplog);
ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate);
ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject());
ASSERT_TRUE(innerOplog.getObject2());
ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value());
ASSERT_FALSE(historyIter.hasNext());
ASSERT_TRUE(nextOplog.getPreImageOpTime());
ASSERT_FALSE(nextOplog.getPostImageOpTime());
// Check preImage oplog
auto preImageOpTime = nextOplog.getPreImageOpTime().value();
auto newPreImageOplog = getOplog(opCtx, preImageOpTime);
ASSERT_TRUE(newPreImageOplog.getStatementId());
ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo();
ASSERT_TRUE(preImageSessionInfo.getSessionId());
ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value());
ASSERT_TRUE(preImageSessionInfo.getTxnNumber());
ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value());
ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject());
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry postImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
postImageOplog.setOperationSessionInfo(sessionInfo);
postImageOplog.setStatementId(45);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({postImageOplog, updateOplog});
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
ASSERT_TRUE(nextOplog.getStatementId());
ASSERT_EQ(45, nextOplog.getStatementId().value());
auto nextSessionInfo = nextOplog.getOperationSessionInfo();
ASSERT_TRUE(nextSessionInfo.getSessionId());
ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value());
ASSERT_TRUE(nextSessionInfo.getTxnNumber());
ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value());
ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject());
auto innerOplog = extractInnerOplog(nextOplog);
ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate);
ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject());
ASSERT_TRUE(innerOplog.getObject2());
ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value());
ASSERT_FALSE(historyIter.hasNext());
ASSERT_FALSE(nextOplog.getPreImageOpTime());
ASSERT_TRUE(nextOplog.getPostImageOpTime());
// Check preImage oplog
auto postImageOpTime = nextOplog.getPostImageOpTime().value();
auto newPostImageOplog = getOplog(opCtx, postImageOpTime);
ASSERT_TRUE(newPostImageOplog.getStatementId());
ASSERT_EQ(45, newPostImageOplog.getStatementId().value());
auto preImageSessionInfo = newPostImageOplog.getOperationSessionInfo();
ASSERT_TRUE(preImageSessionInfo.getSessionId());
ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value());
ASSERT_TRUE(preImageSessionInfo.getTxnNumber());
ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value());
ASSERT_BSONOBJ_EQ(postImageOplog.getObject(), newPostImageOplog.getObject());
ASSERT_TRUE(newPostImageOplog.getObject2());
ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty());
checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry preImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog});
returnOplog({updateOplog});
finishSessionExpectSuccess(&sessionMigration);
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
ASSERT_TRUE(nextOplog.getStatementId());
ASSERT_EQ(45, nextOplog.getStatementId().value());
auto nextSessionInfo = nextOplog.getOperationSessionInfo();
ASSERT_TRUE(nextSessionInfo.getSessionId());
ASSERT_EQ(sessionId, nextSessionInfo.getSessionId().value());
ASSERT_TRUE(nextSessionInfo.getTxnNumber());
ASSERT_EQ(2, nextSessionInfo.getTxnNumber().value());
ASSERT_BSONOBJ_EQ(kSessionOplogTag, nextOplog.getObject());
auto innerOplog = extractInnerOplog(nextOplog);
ASSERT_TRUE(innerOplog.getOpType() == OpTypeEnum::kUpdate);
ASSERT_BSONOBJ_EQ(updateOplog.getObject(), innerOplog.getObject());
ASSERT_TRUE(innerOplog.getObject2());
ASSERT_BSONOBJ_EQ(updateOplog.getObject2().value(), innerOplog.getObject2().value());
ASSERT_FALSE(historyIter.hasNext());
ASSERT_TRUE(nextOplog.getPreImageOpTime());
ASSERT_FALSE(nextOplog.getPostImageOpTime());
// Check preImage oplog
auto preImageOpTime = nextOplog.getPreImageOpTime().value();
auto newPreImageOplog = getOplog(opCtx, preImageOpTime);
ASSERT_TRUE(newPreImageOplog.getStatementId());
ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo();
ASSERT_TRUE(preImageSessionInfo.getSessionId());
ASSERT_EQ(sessionId, preImageSessionInfo.getSessionId().value());
ASSERT_TRUE(preImageSessionInfo.getTxnNumber());
ASSERT_EQ(2, preImageSessionInfo.getTxnNumber().value());
ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject());
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
auto opCtx = operationContext();
OperationSessionInfo newSessionInfo;
newSessionInfo.setSessionId(sessionId);
newSessionInfo.setTxnNumber(20);
insertDocWithSessionInfo(newSessionInfo,
kNs,
BSON("_id"
<< "newerSess"),
0);
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo oldSessionInfo;
oldSessionInfo.setSessionId(sessionId);
oldSessionInfo.setTxnNumber(19);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(oldSessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(oldSessionInfo);
oplog2.setStatementId(45);
returnOplog({oplog1, oplog2});
finishSessionExpectSuccess(&sessionMigration);
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 20);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20));
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
<< "newerSess"),
oplog.getObject());
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
auto opCtx = operationContext();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo oldSessionInfo;
oldSessionInfo.setSessionId(sessionId);
oldSessionInfo.setTxnNumber(19);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(oldSessionInfo);
oplog1.setStatementId(23);
returnOplog({oplog1});
OperationSessionInfo newSessionInfo;
newSessionInfo.setSessionId(sessionId);
newSessionInfo.setTxnNumber(20);
// Ensure that the previous oplog has been processed before proceeding.
returnOplog({});
insertDocWithSessionInfo(newSessionInfo,
kNs,
BSON("_id"
<< "newerSess"),
0);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(oldSessionInfo);
oplog2.setStatementId(45);
returnOplog({oplog2});
finishSessionExpectSuccess(&sessionMigration);
auto session = getSessionWithTxn(opCtx, sessionId, 20);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20));
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
<< "newerSess"),
oplog.getObject());
checkStatementExecuted(opCtx, session.get(), 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
onCommand([](const RemoteCommandRequest& request) -> StatusWith {
return {ErrorCodes::SocketException, "Bad connection"};
});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoOplog) {
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
onCommand([](const RemoteCommandRequest& request) -> StatusWith {
return BSON("oplog" << 1);
});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithBadOplogFormat) {
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
onCommand([](const RemoteCommandRequest& request) -> StatusWith {
BSONObjBuilder builder;
BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog"));
arrBuilder.append(BSON("x" << 1));
arrBuilder.doneFast();
return builder.obj();
});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoSessionId) {
const NamespaceString kNs("a.b");
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setTxnNumber(2);
OplogEntry oplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog.setOperationSessionInfo(sessionInfo);
oplog.setStatementId(23);
returnOplog({oplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoTxnNumber) {
const NamespaceString kNs("a.b");
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
OplogEntry oplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog.setOperationSessionInfo(sessionInfo);
oplog.setStatementId(23);
returnOplog({oplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoStmtId) {
const NamespaceString kNs("a.b");
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
sessionInfo.setTxnNumber(2);
OplogEntry oplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog.setOperationSessionInfo(sessionInfo);
returnOplog({oplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest,
NewWritesWithSameTxnDuringMigrationShouldBeCorrectlySet) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
auto opCtx = operationContext();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
returnOplog({oplog1});
// Ensure that the previous oplog has been processed before proceeding.
returnOplog({});
insertDocWithSessionInfo(sessionInfo,
kNs,
BSON("_id"
<< "newerSess"),
0);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(45);
returnOplog({oplog2});
finishSessionExpectSuccess(&sessionMigration);
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
<< "newerSess"),
oplog.getObject());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 2, 0);
checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry preImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
OplogEntry preImageOplog2(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog2.setOperationSessionInfo(sessionInfo);
preImageOplog2.setStatementId(45);
returnOplog({preImageOplog, preImageOplog2});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorForPreImageOplogWithNonMatchingSessionId) {
const NamespaceString kNs("a.b");
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
sessionInfo.setTxnNumber(2);
OplogEntry preImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNonMatchingTxnNum) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry preImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
sessionInfo.setTxnNumber(56);
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({preImageOplog, updateOplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorIfPreImageOplogFollowWithOplogWithNoPreImageLink) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry preImageOplog(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, BSON("x" << 100));
preImageOplog.setOperationSessionInfo(sessionInfo);
preImageOplog.setStatementId(45);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
returnOplog({preImageOplog, updateOplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorIfOplogWithPreImageLinkIsPrecededByNormalOplog) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPreImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({oplog1, updateOplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorIfOplogWithPostImageLinkIsPrecededByNormalOplog) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry updateOplog(OpTime(Timestamp(80, 2), 1),
0,
OpTypeEnum::kUpdate,
kNs,
0,
BSON("x" << 100),
BSON("$set" << BSON("x" << 101)));
updateOplog.setOperationSessionInfo(sessionInfo);
updateOplog.setStatementId(45);
updateOplog.setPostImageOpTime(repl::OpTime(Timestamp(100, 2), 1));
returnOplog({oplog1, updateOplog});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::ErrorOccurred ==
sessionMigration.getState());
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
auto opCtx = operationContext();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(19);
insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30);
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
OplogEntry oplog1(
OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(30);
OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(45);
returnOplog({oplog1, oplog2, oplog3});
finishSessionExpectSuccess(&sessionMigration);
auto session = getSessionWithTxn(opCtx, sessionId, 19);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(19));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
auto firstInsertOplog = historyIter.next(opCtx);
ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert);
ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
ASSERT_TRUE(firstInsertOplog.getStatementId());
ASSERT_EQ(30, *firstInsertOplog.getStatementId());
checkStatementExecuted(opCtx, session.get(), 19, 23, oplog1);
checkStatementExecuted(opCtx, session.get(), 19, 30);
checkStatementExecuted(opCtx, session.get(), 19, 45, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) {
const NamespaceString kNs("a.b");
const auto sessionId = makeLogicalSessionIdForTest();
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(sessionId);
sessionInfo.setTxnNumber(2);
OplogEntry oplog1(
OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
oplog1.setOperationSessionInfo(sessionInfo);
oplog1.setStatementId(23);
OplogEntry oplog2(
OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, {}, Session::kDeadEndSentinel);
oplog2.setOperationSessionInfo(sessionInfo);
oplog2.setStatementId(kIncompleteHistoryStmtId);
OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
oplog3.setOperationSessionInfo(sessionInfo);
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
// migration always fetches at least twice to transition from committing to done.
returnOplog({});
returnOplog({});
sessionMigration.join();
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplog(oplog2, historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException);
}
} // namespace
} // namespace mongo