From 7a6151bd1f56c1fab96457f2e60206f106608508 Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Mon, 17 Jul 2017 16:54:00 -0400 Subject: SERVER-28918 Make CRUD paths retryable --- src/mongo/db/SConscript | 1 + src/mongo/db/ops/SConscript | 10 ++ src/mongo/db/ops/write_ops_exec.cpp | 46 +++++++-- src/mongo/db/ops/write_ops_retryability.cpp | 75 +++++++++++++++ src/mongo/db/ops/write_ops_retryability.h | 47 ++++++++++ src/mongo/db/ops/write_ops_retryability_test.cpp | 114 +++++++++++++++++++++++ src/mongo/db/session.cpp | 17 ++++ src/mongo/db/session.h | 6 ++ src/mongo/db/session_catalog.cpp | 4 + src/mongo/db/session_test.cpp | 72 ++++++++++++++ 10 files changed, 383 insertions(+), 9 deletions(-) create mode 100644 src/mongo/db/ops/write_ops_retryability.cpp create mode 100644 src/mongo/db/ops/write_ops_retryability.h create mode 100644 src/mongo/db/ops/write_ops_retryability_test.cpp diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index ce27aa72350..a8c9d2d5f8d 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1425,6 +1425,7 @@ env.Library( 'ops/update_lifecycle_impl.cpp', 'ops/update_result.cpp', 'ops/write_ops_exec.cpp', + 'ops/write_ops_retryability.cpp', 'session.cpp', 'session_catalog.cpp', 'transaction_history_iterator.cpp', diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index d83ace7958c..fb9fb665355 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -205,3 +205,13 @@ env.CppIntegrationTest( '$BUILD_DIR/mongo/util/version_impl', ], ) + +env.CppUnitTest( + target='write_ops_retryability_test', + source='write_ops_retryability_test.cpp', + LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + '$BUILD_DIR/mongo/db/write_ops', + ], +) diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index f8548e2a18d..08feb52415f 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -54,6 +54,7 @@ #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_knobs.h" @@ -61,6 +62,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/db/write_concern.h" @@ -450,6 +452,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who const size_t maxBatchSize = internalInsertMaxBatchSize.load(); batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize)); + auto session = OperationContextSession::get(opCtx); + for (auto&& doc : wholeOp.getDocuments()) { const bool isLastDoc = (&doc == &wholeOp.getDocuments().back()); auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc); @@ -458,8 +462,16 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who // correct order. In an ordered insert, if one of the docs ahead of us fails, we should // behave as-if we never got to this document. } else { + auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + if (session) { + if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) { + out.results.emplace_back(parseOplogEntryForInsert(*entry)); + continue; + } + } + BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); - batch.emplace_back(getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), toInsert); + batch.emplace_back(stmtId, toInsert); bytesInBatch += batch.back().doc.objsize(); if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) continue; // Add more to batch before inserting. @@ -590,7 +602,18 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getUpdates().size()); + + auto session = OperationContextSession::get(opCtx); + for (auto&& singleOp : wholeOp.getUpdates()) { + auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + if (session) { + if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) { + out.results.emplace_back(parseOplogEntryForUpdate(*entry)); + continue; + } + } + // TODO: don't create nested CurOp for legacy writes. // Add Command pointer to the nested CurOp. auto& parentCurOp = *CurOp::get(opCtx); @@ -604,10 +627,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleUpdateOp(opCtx, - wholeOp.getNamespace(), - getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), - singleOp)); + performSingleUpdateOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = @@ -706,7 +726,18 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getDeletes().size()); + + auto session = OperationContextSession::get(opCtx); + for (auto&& singleOp : wholeOp.getDeletes()) { + auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + if (session) { + if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) { + out.results.emplace_back(parseOplogEntryForDelete(*entry)); + continue; + } + } + // TODO: don't create nested CurOp for legacy writes. // Add Command pointer to the nested CurOp. auto& parentCurOp = *CurOp::get(opCtx); @@ -720,10 +751,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleDeleteOp(opCtx, - wholeOp.getNamespace(), - getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), - singleOp)); + performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp new file mode 100644 index 00000000000..04c92c81213 --- /dev/null +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ops/single_write_result_gen.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_retryability.h" + +namespace mongo { + +SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry) { + invariant(entry.getOpType() == repl::OpTypeEnum::kInsert); + + SingleWriteResult res; + res.setN(1); + res.setNModified(0); + return res; +} + +SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry) { + SingleWriteResult res; + // Upserts are stored as inserts. + if (entry.getOpType() == repl::OpTypeEnum::kInsert) { + res.setN(1); + res.setNModified(0); + + BSONObjBuilder upserted; + upserted.append(entry.getObject()["_id"]); + res.setUpsertedId(upserted.obj()); + } else if (entry.getOpType() == repl::OpTypeEnum::kUpdate) { + res.setN(1); + res.setNModified(1); + } else { + MONGO_UNREACHABLE; + } + return res; +} + +SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry) { + invariant(entry.getOpType() == repl::OpTypeEnum::kDelete); + + SingleWriteResult res; + res.setN(1); + res.setNModified(0); + return res; +} + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h new file mode 100644 index 00000000000..f3de5c353fa --- /dev/null +++ b/src/mongo/db/ops/write_ops_retryability.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/ops/single_write_result_gen.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { + +/** + * Returns the single write result corresponding to the given oplog entry for insert, update, and + * delete commands, i.e. the single write result that would have been returned by the statement that + * would have resulted in the given oplog entry. The oplog entries are assumed to be properly + * formed and have the correct op type. + */ +SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry); +SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry); +SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry); + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp new file mode 100644 index 00000000000..6e6d30fd491 --- /dev/null +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using WriteOpsRetryability = ServiceContextMongoDTest; + +TEST_F(WriteOpsRetryability, ParseOplogEntryForInsert) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "i" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForInsert(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 0); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); +} + +TEST_F(WriteOpsRetryability, ParseOplogEntryForUpdate) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "u" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5) + << "o2" + << BSON("_id" << 1))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForUpdate(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 1); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); +} + +TEST_F(WriteOpsRetryability, ParseOplogEntryForUpsert) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "i" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForUpdate(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 0); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSON("_id" << 1)); +} + +TEST_F(WriteOpsRetryability, ParseOplogEntryForDelete) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "d" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForDelete(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 0); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 844f80f81d1..0811996eb8f 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -179,4 +179,21 @@ TransactionHistoryIterator Session::getWriteHistory(OperationContext* opCtx) con return TransactionHistoryIterator(getLastWriteOpTimeTs()); } +boost::optional Session::checkStatementExecuted(OperationContext* opCtx, + StmtId stmtId) { + if (!opCtx->getTxnNumber()) { + return boost::none; + } + + auto it = getWriteHistory(opCtx); + while (it.hasNext()) { + auto entry = it.next(opCtx); + if (entry.getStatementId() == stmtId) { + return entry; + } + } + + return boost::none; +} + } // namespace mongo diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 6103b7fc769..f21309ffa8e 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -88,6 +88,12 @@ public: */ const Timestamp& getLastWriteOpTimeTs() const; + /** + * Returns the oplog entry with the given statementId, if it exists. + */ + boost::optional checkStatementExecuted(OperationContext* opCtx, + StmtId stmtId); + private: const LogicalSessionId _sessionId; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 37dd16cccee..e10c643ce11 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -162,6 +162,10 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opC auto& operationSession = operationSessionDecoration(opCtx); operationSession.emplace(sessionTransactionTable->checkOutSession(opCtx)); + + if (opCtx->getTxnNumber()) { + operationSession->get()->begin(opCtx, opCtx->getTxnNumber().get()); + } } OperationContextSession::~OperationContextSession() { diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index 924c2906bd4..70bbb1b40f9 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -32,9 +32,12 @@ #include "mongo/base/init.h" #include "mongo/db/client.h" +#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" @@ -66,6 +69,11 @@ public: _txnTable = stdx::make_unique(nullptr); _txnTable->onStepUp(_opCtx.get()); + + // Note: internal code does not allow implicit creation of non-capped oplog collection. + DBDirectClient client(opCtx()); + ASSERT_TRUE( + client.createCollection(NamespaceString::kRsOplogNamespace.ns(), 1024 * 1024, true)); } void tearDown() override { @@ -76,6 +84,23 @@ public: ServiceContextMongoDTest::tearDown(); } + /** + * Helper method for inserting new entries to the oplog. This completely bypasses + * fixDocumentForInsert. + */ + void insertOplogEntry(BSONObj entry) { + AutoGetCollection autoColl(opCtx(), NamespaceString::kRsOplogNamespace, MODE_IX); + auto coll = autoColl.getCollection(); + ASSERT_TRUE(coll != nullptr); + + auto status = coll->insertDocument(opCtx(), + InsertStatement(entry), + &CurOp::get(opCtx())->debug(), + /* enforceQuota */ false, + /* fromMigrate */ false); + ASSERT_OK(status); + } + OperationContext* opCtx() { return _opCtx.get(); } @@ -472,5 +497,52 @@ TEST_F(SessionTest, TwoSessionsShouldBeIndependent) { ASSERT_FALSE(cursor->more()); } +TEST_F(SessionTest, CheckStatementExecuted) { + const auto sessionId = LogicalSessionId::gen(); + const TxnNumber txnNum = 20; + const StmtId stmtId = 5; + + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + + Session session(sessionId); + session.begin(opCtx(), txnNum); + + // Returns nothing if the statement has not been executed. + auto fetchedEntry = session.checkStatementExecuted(opCtx(), stmtId); + ASSERT_FALSE(fetchedEntry); + + // Returns the correct oplog entry if the statement has completed. + auto optimeTs = Timestamp(50, 10); + insertOplogEntry(BSON("ts" << optimeTs << "t" << 1LL << "h" << 0LL << "op" + << "i" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5) + << "txnNumber" + << txnNum + << "stmtId" + << stmtId + << "prevTs" + << Timestamp(0, 0))); + { + AutoGetCollection autoColl(opCtx(), NamespaceString("a.b"), MODE_IX); + WriteUnitOfWork wuow(opCtx()); + + session.saveTxnProgress(opCtx(), optimeTs); + wuow.commit(); + } + + fetchedEntry = session.checkStatementExecuted(opCtx(), stmtId); + ASSERT_TRUE(fetchedEntry); + ASSERT_EQ(fetchedEntry->getStatementId().get(), stmtId); + + // Still returns nothing for uncompleted statements. + auto uncompletedStmtId = 10; + fetchedEntry = session.checkStatementExecuted(opCtx(), uncompletedStmtId); + ASSERT_FALSE(fetchedEntry); +} + } // namespace } // namespace mongo -- cgit v1.2.1