diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2017-07-24 15:52:16 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2017-07-31 14:18:29 -0400 |
commit | 0a55ace0a362a5944fedbaec2d95b2d7cda750d6 (patch) | |
tree | fe13ddca99d37111ec5f7f32fc66146d82106d5f /src/mongo | |
parent | f305e8262eb4403e2ca1aaf7a5fa9adf9a80fdc7 (diff) | |
download | mongo-0a55ace0a362a5944fedbaec2d95b2d7cda750d6.tar.gz |
SERVER-30262 Update session info during batch apply in secondaries
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/session.h | 14 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.h | 61 |
10 files changed, 300 insertions, 16 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 7a906a19450..9900e938843 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1393,6 +1393,7 @@ env.Library( 'ops/write_ops_retryability.cpp', 'session.cpp', 'session_catalog.cpp', + 'session_txn_record.cpp', 'transaction_history_iterator.cpp', env.Idlc('ops/single_write_result.idl')[0], env.Idlc('session_txn_record.idl')[0], diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 70b81210470..91aa33bf82e 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -544,6 +544,7 @@ env.Library( '$BUILD_DIR/mongo/db/prefetch', '$BUILD_DIR/mongo/db/stats/timer_stats', '$BUILD_DIR/mongo/db/storage/storage_options', + '$BUILD_DIR/mongo/db/write_ops', '$BUILD_DIR/mongo/util/concurrency/thread_pool', '$BUILD_DIR/mongo/util/net/network', 'oplog_entry', @@ -583,6 +584,7 @@ env.CppUnitTest( 'sync_tail_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/dbdirectclient', 'idempotency_test_fixture', 'oplog_interface_local', 'sync_tail_test_fixture', diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index fd0495e12a4..d4848b36ba2 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -44,6 +44,7 @@ #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" @@ -209,6 +210,17 @@ OplogEntry makeCreateIndexOplogEntry(OpTime opTime, opTime, NamespaceString(nss.getSystemIndexesCollection()), indexInfoBob.obj()); } +void appendSessionTransactionInfo(OplogEntry& entry, + LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId) { + auto info = entry.getOperationSessionInfo(); + info.setSessionId(lsid); + info.setTxnNumber(txnNum); + entry.setOperationSessionInfo(std::move(info)); + entry.setStatementId(stmtId); +} + Status IdempotencyTest::runOp(const OplogEntry& op) { return runOps({op}); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h index 2ac6cbe9805..c3c3ff1275f 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.h +++ b/src/mongo/db/repl/idempotency_test_fixture.h @@ -37,6 +37,7 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/bson/timestamp.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" @@ -128,5 +129,10 @@ OplogEntry makeCreateIndexOplogEntry(OpTime opTime, OplogEntry makeCommandOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& command); +void appendSessionTransactionInfo(OplogEntry& entry, + LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index eb346e135c9..e5eb5eca967 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -52,6 +52,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" #include "mongo/db/prefetch.h" #include "mongo/db/query/query_knobs.h" @@ -67,6 +68,8 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/session.h" +#include "mongo/db/session_txn_record.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" @@ -507,6 +510,57 @@ void scheduleWritesToOplog(OperationContext* opCtx, } } +using SessionRecordMap = + stdx::unordered_map<LogicalSessionId, SessionTxnRecord, LogicalSessionIdHash>; + +// Returns a map of the "latest" transaction table records for each logical session id present in +// the given operations. Each record represents the final state of the transaction table entry for +// that session id after the operations are applied. +SessionRecordMap computeLatestTransactionTableRecords(const MultiApplier::Operations& ops) { + SessionRecordMap latestRecords; + for (const auto& op : ops) { + auto sessionInfo = op.getOperationSessionInfo(); + if (!sessionInfo.getTxnNumber()) { + continue; + } + + invariant(sessionInfo.getSessionId()); + LogicalSessionId lsid(*sessionInfo.getSessionId()); + + auto txnNumber = *sessionInfo.getTxnNumber(); + auto opTimeTs = op.getOpTime().getTimestamp(); + + auto it = latestRecords.find(lsid); + if (it != latestRecords.end()) { + auto record = makeSessionTxnRecord(lsid, txnNumber, opTimeTs); + if (record > it->second) { + latestRecords[lsid] = std::move(record); + } + } else { + latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTimeTs)); + } + } + return latestRecords; +} + +void scheduleTxnTableUpdates(OperationContext* opCtx, + OldThreadPool* threadPool, + const SessionRecordMap& latestRecords) { + for (const auto& it : latestRecords) { + auto& record = it.second; + + threadPool->schedule([&record]() { + initializeWriterThread(); + const auto opCtxHolder = cc().makeOperationContext(); + const auto opCtx = opCtxHolder.get(); + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + + Session::updateSessionRecord( + opCtx, record.getSessionId(), record.getTxnNum(), record.getLastWriteOpTimeTs()); + }); + } +} + /** * Caches per-collection properties which are relevant for oplog application, so that they don't * have to be retrieved repeatedly for each op. @@ -1278,6 +1332,7 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, "attempting to replicate ops while primary"}; } + auto latestTxnRecords = computeLatestTransactionTableRecords(ops); std::vector<Status> statusVector(workerPool->getNumThreads(), Status::OK()); { // We must wait for the all work we've dispatched to complete before leaving this block @@ -1298,6 +1353,10 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, consistencyMarkers->setMinValidToAtLeast(opCtx, ops.back().getOpTime()); applyOps(writerVectors, workerPool, applyOperation, &statusVector); + workerPool->join(); + + // Update the transaction table to point to the latest oplog entries for each session id. + scheduleTxnTableUpdates(opCtx, workerPool, latestTxnRecords); } // If any of the statuses is not ok, return error. diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index f993ff9668f..2498a0cd9b5 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" @@ -60,10 +61,12 @@ #include "mongo/db/repl/sync_tail.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_catalog.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/md5.hpp" +#include "mongo/util/scopeguard.h" #include "mongo/util/string_map.h" namespace mongo { @@ -490,6 +493,91 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH ASSERT_BSONOBJ_EQ(op2.raw, operationsWrittenToOplog[1].doc); } +TEST_F(SyncTailTest, MultiApplyUpdatesTheTransactionTable) { + // Set up the transactions collection, which can only be done by the primary. + ASSERT_OK(ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_PRIMARY)); + SessionCatalog::create(_opCtx->getServiceContext()); + SessionCatalog::get(_opCtx->getServiceContext())->onStepUp(_opCtx.get()); + ON_BLOCK_EXIT([&] { SessionCatalog::reset_forTest(_opCtx->getServiceContext()); }); + ASSERT_OK( + ReplicationCoordinator::get(_opCtx.get())->setFollowerMode(MemberState::RS_SECONDARY)); + + // Entries with a session id and a txnNumber update the transaction table. + auto lsidSingle = makeLogicalSessionIdForTest(); + auto opSingle = makeInsertDocumentOplogEntry( + {Timestamp(Seconds(1), 0), 1LL}, NamespaceString("test.0"), BSON("x" << 1)); + appendSessionTransactionInfo(opSingle, lsidSingle, 5LL, 0); + + // For entries with the same session, the entry with a larger txnNumber is saved. + auto lsidDiffTxn = makeLogicalSessionIdForTest(); + auto opDiffTxnSmaller = makeInsertDocumentOplogEntry( + {Timestamp(Seconds(2), 0), 1LL}, NamespaceString("test.1"), BSON("x" << 0)); + appendSessionTransactionInfo(opDiffTxnSmaller, lsidDiffTxn, 10LL, 1); + auto opDiffTxnLarger = makeInsertDocumentOplogEntry( + {Timestamp(Seconds(3), 0), 1LL}, NamespaceString("test.1"), BSON("x" << 1)); + appendSessionTransactionInfo(opDiffTxnLarger, lsidDiffTxn, 20LL, 1); + + // For entries with the same session and txnNumber, the later optime is saved. + auto lsidSameTxn = makeLogicalSessionIdForTest(); + auto opSameTxnLater = makeInsertDocumentOplogEntry( + {Timestamp(Seconds(6), 0), 1LL}, NamespaceString("test.2"), BSON("x" << 0)); + appendSessionTransactionInfo(opSameTxnLater, lsidSameTxn, 30LL, 0); + auto opSameTxnSooner = makeInsertDocumentOplogEntry( + {Timestamp(Seconds(5), 0), 1LL}, NamespaceString("test.2"), BSON("x" << 1)); + appendSessionTransactionInfo(opSameTxnSooner, lsidSameTxn, 30LL, 1); + + // Entries with a session id but no txnNumber do not lead to updates. + auto lsidNoTxn = makeLogicalSessionIdForTest(); + auto opNoTxn = makeInsertDocumentOplogEntry( + {Timestamp(Seconds(7), 0), 1LL}, NamespaceString("test.3"), BSON("x" << 0)); + auto info = opNoTxn.getOperationSessionInfo(); + info.setSessionId(lsidNoTxn); + opNoTxn.setOperationSessionInfo(info); + + // Apply the batch and verify the transaction collection was properly updated for each scenario. + auto writerPool = SyncTail::makeWriterPool(); + ASSERT_OK(multiApply( + _opCtx.get(), + writerPool.get(), + {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnLater, opSameTxnSooner, opNoTxn}, + noopApplyOperationFn)); + + DBDirectClient client(_opCtx.get()); + + // The txnNum and optime of the only write were saved. + auto resultSingle = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidSingle.toBSON())); + ASSERT_TRUE(!resultSingle.isEmpty()); + ASSERT_EQ(resultSingle[SessionTxnRecord::kTxnNumFieldName].numberLong(), 5LL); + ASSERT_EQ(resultSingle[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(), + Timestamp(Seconds(1), 0)); + + // The txnNum and optime of the write with the larger txnNum were saved. + auto resultDiffTxn = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidDiffTxn.toBSON())); + ASSERT_TRUE(!resultDiffTxn.isEmpty()); + ASSERT_EQ(resultDiffTxn[SessionTxnRecord::kTxnNumFieldName].numberLong(), 20LL); + ASSERT_EQ(resultDiffTxn[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(), + Timestamp(Seconds(3), 0)); + + // The txnNum and optime of the write with the later optime were saved. + auto resultSameTxn = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidSameTxn.toBSON())); + ASSERT_TRUE(!resultSameTxn.isEmpty()); + ASSERT_EQ(resultSameTxn[SessionTxnRecord::kTxnNumFieldName].numberLong(), 30LL); + ASSERT_EQ(resultSameTxn[SessionTxnRecord::kLastWriteOpTimeTsFieldName].timestamp(), + Timestamp(Seconds(6), 0)); + + // There is no entry for the write with no txnNumber. + auto resultNoTxn = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + BSON(SessionTxnRecord::kSessionIdFieldName << lsidNoTxn.toBSON())); + ASSERT_TRUE(resultNoTxn.isEmpty()); +} + TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 0811996eb8f..242d549ed0a 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -63,15 +63,15 @@ boost::optional<SessionTxnRecord> loadSessionRecord(OperationContext* opCtx, return SessionTxnRecord::parse(ctx, result); } -/** - * Update the txnNum of the session record. Will create a new entry if the record with - * corresponding sessionId does not exist. - */ -void updateSessionRecordTxnNum(OperationContext* opCtx, - const LogicalSessionId& sessionId, - const TxnNumber& txnNum) { +} // namespace + +Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} + +void Session::updateSessionRecord(OperationContext* opCtx, + const LogicalSessionId& sessionId, + const TxnNumber& txnNum, + const Timestamp& ts) { repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); - Timestamp zeroTs; AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); uassert(40526, @@ -86,7 +86,7 @@ void updateSessionRecordTxnNum(OperationContext* opCtx, updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName << txnNum << SessionTxnRecord::kLastWriteOpTimeTsFieldName - << zeroTs))); + << ts))); updateRequest.setUpsert(true); auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); @@ -95,10 +95,6 @@ void updateSessionRecordTxnNum(OperationContext* opCtx, updateResult.numDocsModified >= 1 || !updateResult.upserted.isEmpty()); } -} // namespace - -Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} - void Session::begin(OperationContext* opCtx, const TxnNumber& txnNumber) { invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); @@ -109,7 +105,7 @@ void Session::begin(OperationContext* opCtx, const TxnNumber& txnNumber) { // Previous read failed to retrieve the txn record, which means it does not exist yet, // so create a new entry. if (!_txnRecord) { - updateSessionRecordTxnNum(opCtx, _sessionId, txnNumber); + updateSessionRecord(opCtx, _sessionId, txnNumber, Timestamp()); _txnRecord.emplace(); _txnRecord->setSessionId(_sessionId); @@ -129,7 +125,7 @@ void Session::begin(OperationContext* opCtx, const TxnNumber& txnNumber) { _txnRecord->getTxnNum() <= txnNumber); if (txnNumber > _txnRecord->getTxnNum()) { - updateSessionRecordTxnNum(opCtx, _sessionId, txnNumber); + updateSessionRecord(opCtx, _sessionId, txnNumber, Timestamp()); _txnRecord->setTxnNum(txnNumber); _txnRecord->setLastWriteOpTimeTs(Timestamp()); } diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index f21309ffa8e..fbfb3c5f6c3 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -33,7 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" #include "mongo/db/logical_session_id.h" -#include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/session_txn_record.h" #include "mongo/db/transaction_history_iterator.h" namespace mongo { @@ -64,6 +64,18 @@ public: } /** + * Update the txnNum and lastWriteOpTimeTs of the session record. Will create a new entry if the + * record with corresponding sessionId does not exist. + * + * Outside callers should use saveTxnProgress instead of this when running as primary and + * serving a user request. + */ + static void updateSessionRecord(OperationContext* opCtx, + const LogicalSessionId& sessionId, + const TxnNumber& txnNum, + const Timestamp& ts); + + /** * Load transaction state from storage if it hasn't. */ void begin(OperationContext* opCtx, const TxnNumber& txnNumber); diff --git a/src/mongo/db/session_txn_record.cpp b/src/mongo/db/session_txn_record.cpp new file mode 100644 index 00000000000..b5b92a644eb --- /dev/null +++ b/src/mongo/db/session_txn_record.cpp @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/timestamp.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/session_txn_record.h" + +namespace mongo { + +SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, Timestamp ts) { + SessionTxnRecord record; + + record.setSessionId(lsid); + record.setTxnNum(txnNum); + record.setLastWriteOpTimeTs(ts); + + return record; +} + +} // namespace mongo diff --git a/src/mongo/db/session_txn_record.h b/src/mongo/db/session_txn_record.h new file mode 100644 index 00000000000..d6425a7076d --- /dev/null +++ b/src/mongo/db/session_txn_record.h @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/timestamp.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/session_txn_record_gen.h" + +namespace mongo { + +inline bool operator==(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) { + return (lhs.getSessionId() == rhs.getSessionId()) && (lhs.getTxnNum() == rhs.getTxnNum()) && + (lhs.getLastWriteOpTimeTs() == rhs.getLastWriteOpTimeTs()); +} + +inline bool operator!=(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) { + return !(lhs == rhs); +} + +/** + * A record is greater (i.e. later) than another if its transaction number is greater, or if its + * transaction number is the same, and its last write optime is greater. Records can only be + * compared meaningfully for the same session id. + */ +inline bool operator>(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) { + invariant(lhs.getSessionId() == rhs.getSessionId()); + + return (lhs.getTxnNum() > rhs.getTxnNum()) || + (lhs.getTxnNum() == rhs.getTxnNum() && + lhs.getLastWriteOpTimeTs() > rhs.getLastWriteOpTimeTs()); +} + +SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, Timestamp ts); + +} // namespace mongo |