diff options
author | Randolph Tan <randolph@10gen.com> | 2017-06-28 17:58:19 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-07-06 13:13:43 -0400 |
commit | fa8c900800a61cd399554f27d43ca58870a95187 (patch) | |
tree | a44f68e934b440ed76841d76f0aa8577f306e9f5 /src | |
parent | 0503521c2375a2e7bfbbda314ff997c77a372d66 (diff) | |
download | mongo-fa8c900800a61cd399554f27d43ca58870a95187.tar.gz |
SERVER-28903 Implement session history iterator
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.idl | 8 | ||||
-rw-r--r-- | src/mongo/db/session_txn_state.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/session_txn_state.h | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction_history_iterator.cpp (renamed from src/mongo/db/session_txn_write_history_iterator.cpp) | 44 | ||||
-rw-r--r-- | src/mongo/db/transaction_history_iterator.h (renamed from src/mongo/db/session_txn_write_history_iterator.h) | 8 | ||||
-rw-r--r-- | src/mongo/db/transaction_history_iterator_test.cpp | 213 |
8 files changed, 282 insertions, 16 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index d5bf1194132..4ffd1129172 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -214,6 +214,7 @@ error_code("DuplicateSession", 213) error_code("AuthenticationRestrictionUnmet", 214) error_code("DatabaseDropPending", 215) error_code("ElectionInProgress", 216) +error_code("IncompleteTransactionHistory", 217); # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index b5cfd10b189..cb1d3a13adc 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1329,13 +1329,14 @@ env.Library( 'session_transaction_table.cpp', 'session_txn_state.cpp', 'session_txn_state_holder.cpp', - 'session_txn_write_history_iterator.cpp', + 'transaction_history_iterator.cpp', env.Idlc('session_txn_record.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/ops/write_ops', + '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/idl/idl_parser', ], ) @@ -1351,3 +1352,16 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/replmocks', ], ) + +env.CppUnitTest( + target='transaction_history_iterator_test', + source=[ + 'transaction_history_iterator_test.cpp', + ], + LIBDEPS=[ + 'service_context_d_test_fixture', + 'session_transactions', + '$BUILD_DIR/mongo/db/repl/replmocks', + ], +) + diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 62d5b4a23d4..0f5a5731021 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -43,7 +43,7 @@ enums: kUpdate: "u" kDelete: "d" kNoop: "n" - + structs: OplogEntryBase: description: A document in which the server stores an oplog entry. @@ -110,3 +110,9 @@ structs: # must exist. description: "Identifier of the transaction statement which generated this oplog entry" + prevTs: + cpp_name: prevWriteTsInTransaction + type: timestamp + optional: true # Only for writes that are part of a transaction + description: "The oplog timestamp of the previous write with the same transaction." + diff --git a/src/mongo/db/session_txn_state.cpp b/src/mongo/db/session_txn_state.cpp index e2813f79922..ac4b9726e28 100644 --- a/src/mongo/db/session_txn_state.cpp +++ b/src/mongo/db/session_txn_state.cpp @@ -190,8 +190,8 @@ const Timestamp& SessionTxnState::getLastWriteOpTimeTs() const { return _txnRecord->getLastWriteOpTimeTs(); } -SessionTxnWriteHistoryIterator SessionTxnState::getWriteHistory(OperationContext* opCtx) const { - return SessionTxnWriteHistoryIterator(getLastWriteOpTimeTs()); +TransactionHistoryIterator SessionTxnState::getWriteHistory(OperationContext* opCtx) const { + return TransactionHistoryIterator(getLastWriteOpTimeTs()); } } // namespace mongo diff --git a/src/mongo/db/session_txn_state.h b/src/mongo/db/session_txn_state.h index 0b3bbe50cba..32dda1d94d6 100644 --- a/src/mongo/db/session_txn_state.h +++ b/src/mongo/db/session_txn_state.h @@ -34,7 +34,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/session_txn_record_gen.h" -#include "mongo/db/session_txn_write_history_iterator.h" +#include "mongo/db/transaction_history_iterator.h" namespace mongo { @@ -65,7 +65,7 @@ public: /** * Returns the history of writes that has happened on this transaction. */ - SessionTxnWriteHistoryIterator getWriteHistory(OperationContext* opCtx) const; + TransactionHistoryIterator getWriteHistory(OperationContext* opCtx) const; /** * Stores the result of a single write operation within this transaction. diff --git a/src/mongo/db/session_txn_write_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp index 91d8047373f..00acb8ef199 100644 --- a/src/mongo/db/session_txn_write_history_iterator.cpp +++ b/src/mongo/db/transaction_history_iterator.cpp @@ -28,22 +28,52 @@ #include "mongo/platform/basic.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" -#include "mongo/db/session_txn_write_history_iterator.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/transaction_history_iterator.h" +#include "mongo/logger/redaction.h" +#include "mongo/util/mongoutils/str.h" namespace mongo { -SessionTxnWriteHistoryIterator::SessionTxnWriteHistoryIterator(Timestamp startingOpTimeTs) +TransactionHistoryIterator::TransactionHistoryIterator(Timestamp startingOpTimeTs) : _nextOpTimeTs(std::move(startingOpTimeTs)) {} -bool SessionTxnWriteHistoryIterator::hasNext() const { +bool TransactionHistoryIterator::hasNext() const { return !_nextOpTimeTs.isNull(); } -repl::OplogEntry next(OperationContext* opCtx) { - // TODO: use DBDirectClient to fetch the oplog entry. - // assert if !hasNext(). - return repl::OplogEntry(BSONObj()); +repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) { + invariant(hasNext()); + + DBDirectClient client(opCtx); + // TODO: SERVER-29843 oplogReplay option might be needed to activate fast ts search. + auto oplogBSON = + client.findOne(repl::rsOplogName, + BSON(repl::OplogEntryBase::kTimestampFieldName << _nextOpTimeTs), + /* fieldsToReturn */ nullptr, + 0 /* QueryOption_OplogReplay */); + + uassert(ErrorCodes::IncompleteTransactionHistory, + str::stream() << "oplog no longer contains the complete write history of this " + "transaction, log with ts " + << _nextOpTimeTs.toBSON() + << " cannot be found", + !oplogBSON.isEmpty()); + + auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); + const auto& oplogPrevTsOption = oplogEntry.getPrevWriteTsInTransaction(); + uassert( + ErrorCodes::FailedToParse, + str::stream() << "Missing prevTs field on oplog entry of previous write in transcation: " + << redact(oplogBSON), + oplogPrevTsOption); + + _nextOpTimeTs = oplogPrevTsOption.value(); + + return oplogEntry; } } // namespace mongo diff --git a/src/mongo/db/session_txn_write_history_iterator.h b/src/mongo/db/transaction_history_iterator.h index 7671dee0093..3b6dee175d1 100644 --- a/src/mongo/db/session_txn_write_history_iterator.h +++ b/src/mongo/db/transaction_history_iterator.h @@ -39,12 +39,12 @@ class OperationContext; * An iterator class that can traverse through the oplog entries that are linked via the prevTs * field. */ -class SessionTxnWriteHistoryIterator { +class TransactionHistoryIterator { public: /** * Creates a new iterator starting with an oplog entry with the given start opTime. */ - SessionTxnWriteHistoryIterator(Timestamp startingOpTimeTs); + TransactionHistoryIterator(Timestamp startingOpTimeTs); /** * Returns false if there are no more entries to iterate. @@ -53,7 +53,9 @@ public: /** * Returns the next oplog entry. - * Throws if hasNext is false. + * Should not be called if hasNext is false. + * Throws if next oplog entry is in a unrecognized format or if it can't find the next oplog + * entry. */ repl::OplogEntry next(OperationContext* opCtx); diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp new file mode 100644 index 00000000000..563a731bab9 --- /dev/null +++ b/src/mongo/db/transaction_history_iterator_test.cpp @@ -0,0 +1,213 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/init.h" +#include "mongo/db/client.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/transaction_history_iterator.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +class SessionHistoryIteratorTest : public ServiceContextMongoDTest { +public: + void setUp() override { + ServiceContextMongoDTest::setUp(); + + _opCtx = cc().makeOperationContext(); + + // Insert code path assumes existence of repl coordinator! + repl::ReplSettings replSettings; + replSettings.setReplSetString( + ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")}) + .toString()); + replSettings.setMaster(true); + + auto service = getServiceContext(); + repl::ReplicationCoordinator::set( + service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings)); + + // Note: internal code does not allow implicit creation of non-capped oplog collection. + DBDirectClient client(opCtx()); + ASSERT_TRUE(client.createCollection(repl::rsOplogName, 1024 * 1024, true)); + } + + void tearDown() override { + // ServiceContextMongoDTest::tearDown() will try to create it's own opCtx, and it's not + // allowed to have 2 present per client, so destroy this one. + _opCtx.reset(); + + ServiceContextMongoDTest::tearDown(); + } + + /** + * Helper method for inserting new entries to the oplog. This completely bypasses + * fixDocumentForInsert. + */ + void insertOplogEntry(const repl::OplogEntry newOplog) { + AutoGetCollection autoColl(opCtx(), NamespaceString(repl::rsOplogName), MODE_IX); + auto coll = autoColl.getCollection(); + ASSERT_TRUE(coll != nullptr); + + auto status = coll->insertDocument(opCtx(), + newOplog.toBSON(), + &CurOp::get(opCtx())->debug(), + /* enforceQuota */ false, + /* fromMigrate */ false); + ASSERT_OK(status); + } + + OperationContext* opCtx() { + return _opCtx.get(); + } + +private: + ServiceContext::UniqueOperationContext _opCtx; +}; + +TEST_F(SessionHistoryIteratorTest, NormalHistory) { + repl::OplogEntry entry1(repl::OpTime(Timestamp(52, 345), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("x" << 30)); + entry1.setPrevWriteTsInTransaction(Timestamp(0, 0)); + insertOplogEntry(entry1); + + repl::OplogEntry entry2(repl::OpTime(Timestamp(67, 54801), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("y" << 50)); + entry2.setPrevWriteTsInTransaction(Timestamp(52, 345)); + insertOplogEntry(entry2); + + // Insert an unrelated entry in between + repl::OplogEntry entry3(repl::OpTime(Timestamp(83, 2), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("z" << 40)); + entry3.setPrevWriteTsInTransaction(Timestamp(22, 67)); + insertOplogEntry(entry3); + + repl::OplogEntry entry4(repl::OpTime(Timestamp(97, 2472), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("a" << 3)); + entry4.setPrevWriteTsInTransaction(Timestamp(67, 54801)); + insertOplogEntry(entry4); + + TransactionHistoryIterator iter(Timestamp(97, 2472)); + + { + ASSERT_TRUE(iter.hasNext()); + auto nextEntry = iter.next(opCtx()); + ASSERT_EQ(Timestamp(97, 2472), nextEntry.getTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("a" << 3), nextEntry.getObject()); + } + + { + ASSERT_TRUE(iter.hasNext()); + auto nextEntry = iter.next(opCtx()); + ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject()); + } + + { + ASSERT_TRUE(iter.hasNext()); + auto nextEntry = iter.next(opCtx()); + ASSERT_EQ(Timestamp(52, 345), nextEntry.getTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("x" << 30), nextEntry.getObject()); + } + + ASSERT_FALSE(iter.hasNext()); +} + +TEST_F(SessionHistoryIteratorTest, StartAtZeroTSShouldNotBeAbleToIterate) { + repl::OplogEntry entry(repl::OpTime(Timestamp(67, 54801), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("y" << 50)); + entry.setPrevWriteTsInTransaction(Timestamp(52, 345)); + insertOplogEntry(entry); + + TransactionHistoryIterator iter(Timestamp(0, 0)); + ASSERT_FALSE(iter.hasNext()); +} + +TEST_F(SessionHistoryIteratorTest, NextShouldAssertIfHistoryIsTruncated) { + repl::OplogEntry entry(repl::OpTime(Timestamp(67, 54801), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("y" << 50)); + entry.setPrevWriteTsInTransaction(Timestamp(52, 345)); + insertOplogEntry(entry); + + TransactionHistoryIterator iter(Timestamp(67, 54801)); + ASSERT_TRUE(iter.hasNext()); + + auto nextEntry = iter.next(opCtx()); + ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp()); + ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject()); + + ASSERT_TRUE(iter.hasNext()); + ASSERT_THROWS_CODE(iter.next(opCtx()), UserException, ErrorCodes::IncompleteTransactionHistory); +} + +TEST_F(SessionHistoryIteratorTest, OplogInWriteHistoryChainWithMissingPrevTSShouldAssert) { + repl::OplogEntry entry(repl::OpTime(Timestamp(67, 54801), 2), + 0, + repl::OpTypeEnum::kInsert, + NamespaceString("a.b"), + BSON("y" << 50)); + insertOplogEntry(entry); + + TransactionHistoryIterator iter(Timestamp(67, 54801)); + ASSERT_TRUE(iter.hasNext()); + ASSERT_THROWS_CODE(iter.next(opCtx()), UserException, ErrorCodes::FailedToParse); +} + +} // namespace mongo |