summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2017-07-17 16:54:00 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2017-07-24 11:22:04 -0400
commit3bc2d6c1c5782ab8456b40852930aef3e7e7f802 (patch)
tree36abd7bcbdb41e446b9a8baf3b7c650d51544876 /src/mongo/db/ops
parentffe425ee16d9c597732350dfe6de73b2fd9305d0 (diff)
downloadmongo-3bc2d6c1c5782ab8456b40852930aef3e7e7f802.tar.gz
SERVER-28918 Make CRUD paths retryable
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r--src/mongo/db/ops/SConscript10
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp46
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp75
-rw-r--r--src/mongo/db/ops/write_ops_retryability.h47
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp114
5 files changed, 283 insertions, 9 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index d83ace7958c..fb9fb665355 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -205,3 +205,13 @@ env.CppIntegrationTest(
'$BUILD_DIR/mongo/util/version_impl',
],
)
+
+env.CppUnitTest(
+ target='write_ops_retryability_test',
+ source='write_ops_retryability_test.cpp',
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ '$BUILD_DIR/mongo/db/write_ops',
+ ],
+)
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index f8548e2a18d..c36f2126fc1 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/ops/update_request.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_knobs.h"
@@ -61,6 +62,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/write_concern.h"
@@ -450,6 +452,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
const size_t maxBatchSize = internalInsertMaxBatchSize.load();
batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize));
+ auto session = OperationContextSession::get(opCtx);
+
for (auto&& doc : wholeOp.getDocuments()) {
const bool isLastDoc = (&doc == &wholeOp.getDocuments().back());
auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc);
@@ -458,8 +462,16 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
// correct order. In an ordered insert, if one of the docs ahead of us fails, we should
// behave as-if we never got to this document.
} else {
+ auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
+ if (session) {
+ if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) {
+ out.results.emplace_back(parseOplogEntryForInsert(*entry));
+ continue;
+ }
+ }
+
BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
- batch.emplace_back(getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), toInsert);
+ batch.emplace_back(stmtId, toInsert);
bytesInBatch += batch.back().doc.objsize();
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
continue; // Add more to batch before inserting.
@@ -590,7 +602,18 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getUpdates().size());
+
+ auto session = OperationContextSession::get(opCtx);
+
for (auto&& singleOp : wholeOp.getUpdates()) {
+ auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex);
+ if (session) {
+ if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) {
+ out.results.emplace_back(parseOplogEntryForUpdate(*entry));
+ continue;
+ }
+ }
+
// TODO: don't create nested CurOp for legacy writes.
// Add Command pointer to the nested CurOp.
auto& parentCurOp = *CurOp::get(opCtx);
@@ -604,10 +627,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
try {
lastOpFixer.startingOp();
out.results.emplace_back(
- performSingleUpdateOp(opCtx,
- wholeOp.getNamespace(),
- getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++),
- singleOp));
+ performSingleUpdateOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =
@@ -706,7 +726,18 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getDeletes().size());
+
+ auto session = OperationContextSession::get(opCtx);
+
for (auto&& singleOp : wholeOp.getDeletes()) {
+ auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
+ if (session) {
+ if (auto entry = session->checkStatementExecuted(opCtx, stmtId)) {
+ out.results.emplace_back(parseOplogEntryForDelete(*entry));
+ continue;
+ }
+ }
+
// TODO: don't create nested CurOp for legacy writes.
// Add Command pointer to the nested CurOp.
auto& parentCurOp = *CurOp::get(opCtx);
@@ -720,10 +751,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
try {
lastOpFixer.startingOp();
out.results.emplace_back(
- performSingleDeleteOp(opCtx,
- wholeOp.getNamespace(),
- getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++),
- singleOp));
+ performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
new file mode 100644
index 00000000000..04c92c81213
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/ops/single_write_result_gen.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_retryability.h"
+
+namespace mongo {
+
+SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry) {
+ invariant(entry.getOpType() == repl::OpTypeEnum::kInsert);
+
+ SingleWriteResult res;
+ res.setN(1);
+ res.setNModified(0);
+ return res;
+}
+
+SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry) {
+ SingleWriteResult res;
+ // Upserts are stored as inserts.
+ if (entry.getOpType() == repl::OpTypeEnum::kInsert) {
+ res.setN(1);
+ res.setNModified(0);
+
+ BSONObjBuilder upserted;
+ upserted.append(entry.getObject()["_id"]);
+ res.setUpsertedId(upserted.obj());
+ } else if (entry.getOpType() == repl::OpTypeEnum::kUpdate) {
+ res.setN(1);
+ res.setNModified(1);
+ } else {
+ MONGO_UNREACHABLE;
+ }
+ return res;
+}
+
+SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry) {
+ invariant(entry.getOpType() == repl::OpTypeEnum::kDelete);
+
+ SingleWriteResult res;
+ res.setN(1);
+ res.setNModified(0);
+ return res;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h
new file mode 100644
index 00000000000..f3de5c353fa
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_retryability.h
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/ops/single_write_result_gen.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/repl/oplog_entry.h"
+
+namespace mongo {
+
+/**
+ * Returns the single write result corresponding to the given oplog entry for insert, update, and
+ * delete commands, i.e. the single write result that would have been returned by the statement that
+ * would have resulted in the given oplog entry. The oplog entries are assumed to be properly
+ * formed and have the correct op type.
+ */
+SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry);
+SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry);
+SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry);
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
new file mode 100644
index 00000000000..6e6d30fd491
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using WriteOpsRetryability = ServiceContextMongoDTest;
+
+TEST_F(WriteOpsRetryability, ParseOplogEntryForInsert) {
+ auto entry =
+ repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op"
+ << "i"
+ << "ns"
+ << "a.b"
+ << "o"
+ << BSON("_id" << 1 << "x" << 5)));
+ ASSERT(entry.isOK());
+
+ auto res = mongo::parseOplogEntryForInsert(entry.getValue());
+
+ ASSERT_EQ(res.getN(), 1);
+ ASSERT_EQ(res.getNModified(), 0);
+ ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj());
+}
+
+TEST_F(WriteOpsRetryability, ParseOplogEntryForUpdate) {
+ auto entry =
+ repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op"
+ << "u"
+ << "ns"
+ << "a.b"
+ << "o"
+ << BSON("_id" << 1 << "x" << 5)
+ << "o2"
+ << BSON("_id" << 1)));
+ ASSERT(entry.isOK());
+
+ auto res = mongo::parseOplogEntryForUpdate(entry.getValue());
+
+ ASSERT_EQ(res.getN(), 1);
+ ASSERT_EQ(res.getNModified(), 1);
+ ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj());
+}
+
+TEST_F(WriteOpsRetryability, ParseOplogEntryForUpsert) {
+ auto entry =
+ repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op"
+ << "i"
+ << "ns"
+ << "a.b"
+ << "o"
+ << BSON("_id" << 1 << "x" << 5)));
+ ASSERT(entry.isOK());
+
+ auto res = mongo::parseOplogEntryForUpdate(entry.getValue());
+
+ ASSERT_EQ(res.getN(), 1);
+ ASSERT_EQ(res.getNModified(), 0);
+ ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSON("_id" << 1));
+}
+
+TEST_F(WriteOpsRetryability, ParseOplogEntryForDelete) {
+ auto entry =
+ repl::OplogEntry::parse(BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "h" << 0LL << "op"
+ << "d"
+ << "ns"
+ << "a.b"
+ << "o"
+ << BSON("_id" << 1 << "x" << 5)));
+ ASSERT(entry.isOK());
+
+ auto res = mongo::parseOplogEntryForDelete(entry.getValue());
+
+ ASSERT_EQ(res.getN(), 1);
+ ASSERT_EQ(res.getNModified(), 0);
+ ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj());
+}
+
+} // namespace
+} // namespace mongo