diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2017-07-17 16:54:00 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2017-07-24 11:22:04 -0400 |
commit | 3bc2d6c1c5782ab8456b40852930aef3e7e7f802 (patch) | |
tree | 36abd7bcbdb41e446b9a8baf3b7c650d51544876 /src/mongo/db/ops | |
parent | ffe425ee16d9c597732350dfe6de73b2fd9305d0 (diff) | |
download | mongo-3bc2d6c1c5782ab8456b40852930aef3e7e7f802.tar.gz |
SERVER-28918 Make CRUD paths retryable
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r-- | src/mongo/db/ops/SConscript | 10 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.h | 47 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability_test.cpp | 114 |
5 files changed, 283 insertions, 9 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index d83ace7958c..fb9fb665355 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -205,3 +205,13 @@ env.CppIntegrationTest( '$BUILD_DIR/mongo/util/version_impl', ], ) + +env.CppUnitTest( + target='write_ops_retryability_test', + source='write_ops_retryability_test.cpp', + LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + '$BUILD_DIR/mongo/db/write_ops', + ], +) diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index f8548e2a18d..c36f2126fc1 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -54,6 +54,7 @@ #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_knobs.h" @@ -61,6 +62,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/db/write_concern.h" @@ -450,6 +452,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who const size_t maxBatchSize = internalInsertMaxBatchSize.load(); batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize)); + auto session = OperationContextSession::get(opCtx); + for (auto&& doc : wholeOp.getDocuments()) { const bool isLastDoc = (&doc == &wholeOp.getDocuments().back()); auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc); @@ -458,8 +462,16 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who // correct order. In an ordered insert, if one of the docs ahead of us fails, we should // behave as-if we never got to this document. } else { + auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + if (session) { + if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) { + out.results.emplace_back(parseOplogEntryForInsert(*entry)); + continue; + } + } + BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); - batch.emplace_back(getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), toInsert); + batch.emplace_back(stmtId, toInsert); bytesInBatch += batch.back().doc.objsize(); if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) continue; // Add more to batch before inserting. @@ -590,7 +602,18 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getUpdates().size()); + + auto session = OperationContextSession::get(opCtx); + for (auto&& singleOp : wholeOp.getUpdates()) { + auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex); + if (session) { + if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) { + out.results.emplace_back(parseOplogEntryForUpdate(*entry)); + continue; + } + } + // TODO: don't create nested CurOp for legacy writes. // Add Command pointer to the nested CurOp. auto& parentCurOp = *CurOp::get(opCtx); @@ -604,10 +627,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleUpdateOp(opCtx, - wholeOp.getNamespace(), - getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), - singleOp)); + performSingleUpdateOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = @@ -706,7 +726,18 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getDeletes().size()); + + auto session = OperationContextSession::get(opCtx); + for (auto&& singleOp : wholeOp.getDeletes()) { + auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + if (session) { + if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) { + out.results.emplace_back(parseOplogEntryForDelete(*entry)); + continue; + } + } + // TODO: don't create nested CurOp for legacy writes. // Add Command pointer to the nested CurOp. auto& parentCurOp = *CurOp::get(opCtx); @@ -720,10 +751,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleDeleteOp(opCtx, - wholeOp.getNamespace(), - getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), - singleOp)); + performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp new file mode 100644 index 00000000000..04c92c81213 --- /dev/null +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ops/single_write_result_gen.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_retryability.h" + +namespace mongo { + +SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry) { + invariant(entry.getOpType() == repl::OpTypeEnum::kInsert); + + SingleWriteResult res; + res.setN(1); + res.setNModified(0); + return res; +} + +SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry) { + SingleWriteResult res; + // Upserts are stored as inserts. + if (entry.getOpType() == repl::OpTypeEnum::kInsert) { + res.setN(1); + res.setNModified(0); + + BSONObjBuilder upserted; + upserted.append(entry.getObject()["_id"]); + res.setUpsertedId(upserted.obj()); + } else if (entry.getOpType() == repl::OpTypeEnum::kUpdate) { + res.setN(1); + res.setNModified(1); + } else { + MONGO_UNREACHABLE; + } + return res; +} + +SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry) { + invariant(entry.getOpType() == repl::OpTypeEnum::kDelete); + + SingleWriteResult res; + res.setN(1); + res.setNModified(0); + return res; +} + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h new file mode 100644 index 00000000000..f3de5c353fa --- /dev/null +++ b/src/mongo/db/ops/write_ops_retryability.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/ops/single_write_result_gen.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { + +/** + * Returns the single write result corresponding to the given oplog entry for insert, update, and + * delete commands, i.e. the single write result that would have been returned by the statement that + * would have resulted in the given oplog entry. The oplog entries are assumed to be properly + * formed and have the correct op type. + */ +SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry); +SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry); +SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry); + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp new file mode 100644 index 00000000000..6e6d30fd491 --- /dev/null +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using WriteOpsRetryability = ServiceContextMongoDTest; + +TEST_F(WriteOpsRetryability, ParseOplogEntryForInsert) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "i" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForInsert(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 0); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); +} + +TEST_F(WriteOpsRetryability, ParseOplogEntryForUpdate) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "u" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5) + << "o2" + << BSON("_id" << 1))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForUpdate(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 1); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); +} + +TEST_F(WriteOpsRetryability, ParseOplogEntryForUpsert) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "i" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForUpdate(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 0); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSON("_id" << 1)); +} + +TEST_F(WriteOpsRetryability, ParseOplogEntryForDelete) { + auto entry = + repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op" + << "d" + << "ns" + << "a.b" + << "o" + << BSON("_id" << 1 << "x" << 5))); + ASSERT(entry.isOK()); + + auto res = mongo::parseOplogEntryForDelete(entry.getValue()); + + ASSERT_EQ(res.getN(), 1); + ASSERT_EQ(res.getNModified(), 0); + ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); +} + +} // namespace +} // namespace mongo |