summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-06-28 17:58:19 -0400
committerRandolph Tan <randolph@10gen.com>2017-07-06 13:13:43 -0400
commitfa8c900800a61cd399554f27d43ca58870a95187 (patch)
treea44f68e934b440ed76841d76f0aa8577f306e9f5 /src
parent0503521c2375a2e7bfbbda314ff997c77a372d66 (diff)
downloadmongo-fa8c900800a61cd399554f27d43ca58870a95187.tar.gz
SERVER-28903 Implement session history iterator
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/SConscript16
-rw-r--r--src/mongo/db/repl/oplog_entry.idl8
-rw-r--r--src/mongo/db/session_txn_state.cpp4
-rw-r--r--src/mongo/db/session_txn_state.h4
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp (renamed from src/mongo/db/session_txn_write_history_iterator.cpp)44
-rw-r--r--src/mongo/db/transaction_history_iterator.h (renamed from src/mongo/db/session_txn_write_history_iterator.h)8
-rw-r--r--src/mongo/db/transaction_history_iterator_test.cpp213
8 files changed, 282 insertions, 16 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index d5bf1194132..4ffd1129172 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -214,6 +214,7 @@ error_code("DuplicateSession", 213)
error_code("AuthenticationRestrictionUnmet", 214)
error_code("DatabaseDropPending", 215)
error_code("ElectionInProgress", 216)
+error_code("IncompleteTransactionHistory", 217);
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index b5cfd10b189..cb1d3a13adc 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1329,13 +1329,14 @@ env.Library(
'session_transaction_table.cpp',
'session_txn_state.cpp',
'session_txn_state_holder.cpp',
- 'session_txn_write_history_iterator.cpp',
+ 'transaction_history_iterator.cpp',
env.Idlc('session_txn_record.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/ops/write_ops',
+ '$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/idl/idl_parser',
],
)
@@ -1351,3 +1352,16 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/replmocks',
],
)
+
+env.CppUnitTest(
+ target='transaction_history_iterator_test',
+ source=[
+ 'transaction_history_iterator_test.cpp',
+ ],
+ LIBDEPS=[
+ 'service_context_d_test_fixture',
+ 'session_transactions',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
+ ],
+)
+
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 62d5b4a23d4..0f5a5731021 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -43,7 +43,7 @@ enums:
kUpdate: "u"
kDelete: "d"
kNoop: "n"
-
+
structs:
OplogEntryBase:
description: A document in which the server stores an oplog entry.
@@ -110,3 +110,9 @@ structs:
# must exist.
description: "Identifier of the transaction statement which generated this oplog
entry"
+ prevTs:
+ cpp_name: prevWriteTsInTransaction
+ type: timestamp
+ optional: true # Only for writes that are part of a transaction
+ description: "The oplog timestamp of the previous write with the same transaction."
+
diff --git a/src/mongo/db/session_txn_state.cpp b/src/mongo/db/session_txn_state.cpp
index e2813f79922..ac4b9726e28 100644
--- a/src/mongo/db/session_txn_state.cpp
+++ b/src/mongo/db/session_txn_state.cpp
@@ -190,8 +190,8 @@ const Timestamp& SessionTxnState::getLastWriteOpTimeTs() const {
return _txnRecord->getLastWriteOpTimeTs();
}
-SessionTxnWriteHistoryIterator SessionTxnState::getWriteHistory(OperationContext* opCtx) const {
- return SessionTxnWriteHistoryIterator(getLastWriteOpTimeTs());
+TransactionHistoryIterator SessionTxnState::getWriteHistory(OperationContext* opCtx) const {
+ return TransactionHistoryIterator(getLastWriteOpTimeTs());
}
} // namespace mongo
diff --git a/src/mongo/db/session_txn_state.h b/src/mongo/db/session_txn_state.h
index 0b3bbe50cba..32dda1d94d6 100644
--- a/src/mongo/db/session_txn_state.h
+++ b/src/mongo/db/session_txn_state.h
@@ -34,7 +34,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/session_txn_record_gen.h"
-#include "mongo/db/session_txn_write_history_iterator.h"
+#include "mongo/db/transaction_history_iterator.h"
namespace mongo {
@@ -65,7 +65,7 @@ public:
/**
* Returns the history of writes that has happened on this transaction.
*/
- SessionTxnWriteHistoryIterator getWriteHistory(OperationContext* opCtx) const;
+ TransactionHistoryIterator getWriteHistory(OperationContext* opCtx) const;
/**
* Stores the result of a single write operation within this transaction.
diff --git a/src/mongo/db/session_txn_write_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index 91d8047373f..00acb8ef199 100644
--- a/src/mongo/db/session_txn_write_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -28,22 +28,52 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/session_txn_write_history_iterator.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/logger/redaction.h"
+#include "mongo/util/mongoutils/str.h"
namespace mongo {
-SessionTxnWriteHistoryIterator::SessionTxnWriteHistoryIterator(Timestamp startingOpTimeTs)
+TransactionHistoryIterator::TransactionHistoryIterator(Timestamp startingOpTimeTs)
: _nextOpTimeTs(std::move(startingOpTimeTs)) {}
-bool SessionTxnWriteHistoryIterator::hasNext() const {
+bool TransactionHistoryIterator::hasNext() const {
return !_nextOpTimeTs.isNull();
}
-repl::OplogEntry next(OperationContext* opCtx) {
- // TODO: use DBDirectClient to fetch the oplog entry.
- // assert if !hasNext().
- return repl::OplogEntry(BSONObj());
+repl::OplogEntry TransactionHistoryIterator::next(OperationContext* opCtx) {
+ invariant(hasNext());
+
+ DBDirectClient client(opCtx);
+ // TODO: SERVER-29843 oplogReplay option might be needed to activate fast ts search.
+ auto oplogBSON =
+ client.findOne(repl::rsOplogName,
+ BSON(repl::OplogEntryBase::kTimestampFieldName << _nextOpTimeTs),
+ /* fieldsToReturn */ nullptr,
+ 0 /* QueryOption_OplogReplay */);
+
+ uassert(ErrorCodes::IncompleteTransactionHistory,
+ str::stream() << "oplog no longer contains the complete write history of this "
+ "transaction, log with ts "
+ << _nextOpTimeTs.toBSON()
+ << " cannot be found",
+ !oplogBSON.isEmpty());
+
+ auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
+ const auto& oplogPrevTsOption = oplogEntry.getPrevWriteTsInTransaction();
+ uassert(
+ ErrorCodes::FailedToParse,
+ str::stream() << "Missing prevTs field on oplog entry of previous write in transcation: "
+ << redact(oplogBSON),
+ oplogPrevTsOption);
+
+ _nextOpTimeTs = oplogPrevTsOption.value();
+
+ return oplogEntry;
}
} // namespace mongo
diff --git a/src/mongo/db/session_txn_write_history_iterator.h b/src/mongo/db/transaction_history_iterator.h
index 7671dee0093..3b6dee175d1 100644
--- a/src/mongo/db/session_txn_write_history_iterator.h
+++ b/src/mongo/db/transaction_history_iterator.h
@@ -39,12 +39,12 @@ class OperationContext;
* An iterator class that can traverse through the oplog entries that are linked via the prevTs
* field.
*/
-class SessionTxnWriteHistoryIterator {
+class TransactionHistoryIterator {
public:
/**
* Creates a new iterator starting with an oplog entry with the given start opTime.
*/
- SessionTxnWriteHistoryIterator(Timestamp startingOpTimeTs);
+ TransactionHistoryIterator(Timestamp startingOpTimeTs);
/**
* Returns false if there are no more entries to iterate.
@@ -53,7 +53,9 @@ public:
/**
* Returns the next oplog entry.
- * Throws if hasNext is false.
+ * Should not be called if hasNext is false.
+ * Throws if next oplog entry is in a unrecognized format or if it can't find the next oplog
+ * entry.
*/
repl::OplogEntry next(OperationContext* opCtx);
diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp
new file mode 100644
index 00000000000..563a731bab9
--- /dev/null
+++ b/src/mongo/db/transaction_history_iterator_test.cpp
@@ -0,0 +1,213 @@
+/**
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/base/init.h"
+#include "mongo/db/client.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+class SessionHistoryIteratorTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+
+ _opCtx = cc().makeOperationContext();
+
+ // Insert code path assumes existence of repl coordinator!
+ repl::ReplSettings replSettings;
+ replSettings.setReplSetString(
+ ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")})
+ .toString());
+ replSettings.setMaster(true);
+
+ auto service = getServiceContext();
+ repl::ReplicationCoordinator::set(
+ service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings));
+
+ // Note: internal code does not allow implicit creation of non-capped oplog collection.
+ DBDirectClient client(opCtx());
+ ASSERT_TRUE(client.createCollection(repl::rsOplogName, 1024 * 1024, true));
+ }
+
+ void tearDown() override {
+ // ServiceContextMongoDTest::tearDown() will try to create it's own opCtx, and it's not
+ // allowed to have 2 present per client, so destroy this one.
+ _opCtx.reset();
+
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ /**
+ * Helper method for inserting new entries to the oplog. This completely bypasses
+ * fixDocumentForInsert.
+ */
+ void insertOplogEntry(const repl::OplogEntry newOplog) {
+ AutoGetCollection autoColl(opCtx(), NamespaceString(repl::rsOplogName), MODE_IX);
+ auto coll = autoColl.getCollection();
+ ASSERT_TRUE(coll != nullptr);
+
+ auto status = coll->insertDocument(opCtx(),
+ newOplog.toBSON(),
+ &CurOp::get(opCtx())->debug(),
+ /* enforceQuota */ false,
+ /* fromMigrate */ false);
+ ASSERT_OK(status);
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+private:
+ ServiceContext::UniqueOperationContext _opCtx;
+};
+
+TEST_F(SessionHistoryIteratorTest, NormalHistory) {
+ repl::OplogEntry entry1(repl::OpTime(Timestamp(52, 345), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("x" << 30));
+ entry1.setPrevWriteTsInTransaction(Timestamp(0, 0));
+ insertOplogEntry(entry1);
+
+ repl::OplogEntry entry2(repl::OpTime(Timestamp(67, 54801), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("y" << 50));
+ entry2.setPrevWriteTsInTransaction(Timestamp(52, 345));
+ insertOplogEntry(entry2);
+
+ // Insert an unrelated entry in between
+ repl::OplogEntry entry3(repl::OpTime(Timestamp(83, 2), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("z" << 40));
+ entry3.setPrevWriteTsInTransaction(Timestamp(22, 67));
+ insertOplogEntry(entry3);
+
+ repl::OplogEntry entry4(repl::OpTime(Timestamp(97, 2472), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("a" << 3));
+ entry4.setPrevWriteTsInTransaction(Timestamp(67, 54801));
+ insertOplogEntry(entry4);
+
+ TransactionHistoryIterator iter(Timestamp(97, 2472));
+
+ {
+ ASSERT_TRUE(iter.hasNext());
+ auto nextEntry = iter.next(opCtx());
+ ASSERT_EQ(Timestamp(97, 2472), nextEntry.getTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("a" << 3), nextEntry.getObject());
+ }
+
+ {
+ ASSERT_TRUE(iter.hasNext());
+ auto nextEntry = iter.next(opCtx());
+ ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject());
+ }
+
+ {
+ ASSERT_TRUE(iter.hasNext());
+ auto nextEntry = iter.next(opCtx());
+ ASSERT_EQ(Timestamp(52, 345), nextEntry.getTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("x" << 30), nextEntry.getObject());
+ }
+
+ ASSERT_FALSE(iter.hasNext());
+}
+
+TEST_F(SessionHistoryIteratorTest, StartAtZeroTSShouldNotBeAbleToIterate) {
+ repl::OplogEntry entry(repl::OpTime(Timestamp(67, 54801), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("y" << 50));
+ entry.setPrevWriteTsInTransaction(Timestamp(52, 345));
+ insertOplogEntry(entry);
+
+ TransactionHistoryIterator iter(Timestamp(0, 0));
+ ASSERT_FALSE(iter.hasNext());
+}
+
+TEST_F(SessionHistoryIteratorTest, NextShouldAssertIfHistoryIsTruncated) {
+ repl::OplogEntry entry(repl::OpTime(Timestamp(67, 54801), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("y" << 50));
+ entry.setPrevWriteTsInTransaction(Timestamp(52, 345));
+ insertOplogEntry(entry);
+
+ TransactionHistoryIterator iter(Timestamp(67, 54801));
+ ASSERT_TRUE(iter.hasNext());
+
+ auto nextEntry = iter.next(opCtx());
+ ASSERT_EQ(Timestamp(67, 54801), nextEntry.getTimestamp());
+ ASSERT_BSONOBJ_EQ(BSON("y" << 50), nextEntry.getObject());
+
+ ASSERT_TRUE(iter.hasNext());
+ ASSERT_THROWS_CODE(iter.next(opCtx()), UserException, ErrorCodes::IncompleteTransactionHistory);
+}
+
+TEST_F(SessionHistoryIteratorTest, OplogInWriteHistoryChainWithMissingPrevTSShouldAssert) {
+ repl::OplogEntry entry(repl::OpTime(Timestamp(67, 54801), 2),
+ 0,
+ repl::OpTypeEnum::kInsert,
+ NamespaceString("a.b"),
+ BSON("y" << 50));
+ insertOplogEntry(entry);
+
+ TransactionHistoryIterator iter(Timestamp(67, 54801));
+ ASSERT_TRUE(iter.hasNext());
+ ASSERT_THROWS_CODE(iter.next(opCtx()), UserException, ErrorCodes::FailedToParse);
+}
+
+} // namespace mongo