summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-08-03 18:51:27 -0400
committerRandolph Tan <randolph@10gen.com>2017-08-17 16:58:40 -0400
commit86f8af9d40b294852399097daf80894e28c20adc (patch)
tree7062c4c7ce754ed1874b8a36e51b103a7f43a8cb /src/mongo/db/ops
parent1e11cda15ddae9972f9993a7d6b6cbf9d172bcb3 (diff)
downloadmongo-86f8af9d40b294852399097daf80894e28c20adc.tar.gz
SERVER-30407 Add retry logic in findAndModify
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r--src/mongo/db/ops/SConscript1
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp152
-rw-r--r--src/mongo/db/ops/write_ops_retryability.h11
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp273
4 files changed, 437 insertions, 0 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index 6cd60f6c9e1..ec3fd359719 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -189,6 +189,7 @@ env.CppUnitTest(
source='write_ops_retryability_test.cpp',
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/db/write_ops',
],
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 04c92c81213..706a73bc655 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -28,13 +28,123 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/ops/single_write_result_gen.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/find_and_modify_request.h"
+#include "mongo/logger/redaction.h"
namespace mongo {
+namespace {
+
+/**
+ * Validates that the request is retry-compatible with the operation that occurred.
+ */
+void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
+ const repl::OplogEntry& oplogEntry) {
+ auto opType = oplogEntry.getOpType();
+ auto ts = oplogEntry.getTimestamp();
+
+ if (opType == repl::OpTypeEnum::kDelete) {
+ uassert(
+ 40606,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " is not compatible with previous write in the transaction of type: "
+ << OpType_serializer(oplogEntry.getOpType())
+ << ", oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ request.isRemove());
+ uassert(40607,
+ str::stream() << "No pre-image available for findAndModify retry request:"
+ << redact(request.toBSON()),
+ oplogEntry.getPreImageTs());
+ } else if (opType == repl::OpTypeEnum::kInsert) {
+ uassert(
+ 40608,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " is not compatible with previous write in the transaction of type: "
+ << OpType_serializer(oplogEntry.getOpType())
+ << ", oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ request.isUpsert());
+ } else {
+ uassert(
+ 40609,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " is not compatible with previous write in the transaction of type: "
+ << OpType_serializer(oplogEntry.getOpType())
+ << ", oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ opType == repl::OpTypeEnum::kUpdate);
+ uassert(
+ 40610,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " is not compatible with previous write in the transaction of type: "
+ << OpType_serializer(oplogEntry.getOpType())
+ << ", oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ !request.isUpsert());
+
+ if (request.shouldReturnNew()) {
+ uassert(40611,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " wants the document after update returned, but only before "
+ "update document is stored, oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ oplogEntry.getPostImageTs());
+ } else {
+ uassert(40612,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " wants the document before update returned, but only after "
+ "update document is stored, oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ oplogEntry.getPreImageTs());
+ }
+ }
+}
+
+/**
+ * Extracts either the pre or post image (cannot be both) of the findAndModify operation from the
+ * oplog.
+ */
+BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) {
+ invariant(oplog.getPreImageTs() || oplog.getPostImageTs());
+ auto ts =
+ oplog.getPreImageTs() ? oplog.getPreImageTs().value() : oplog.getPostImageTs().value();
+
+ DBDirectClient client(opCtx);
+ auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), BSON("ts" << ts));
+
+ uassert(40613,
+ str::stream() << "oplog no longer contains the complete write history of this "
+ "transaction, log with ts "
+ << ts.toString()
+ << " cannot be found",
+ !oplogDoc.isEmpty());
+ auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc));
+
+ return oplogEntry.getObject().getOwned();
+}
+
+} // namespace
+
SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry) {
invariant(entry.getOpType() == repl::OpTypeEnum::kInsert);
@@ -72,4 +182,46 @@ SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry) {
return res;
}
+FindAndModifyResult parseOplogEntryForFindAndModify(OperationContext* opCtx,
+ const FindAndModifyRequest& request,
+ const repl::OplogEntry& oplogEntry) {
+ validateFindAndModifyRetryability(request, oplogEntry);
+
+ FindAndModifyResult result;
+
+ auto opType = oplogEntry.getOpType();
+
+ if (opType == repl::OpTypeEnum::kDelete) {
+ FindAndModifyLastError lastError;
+ lastError.setN(1);
+ result.setLastErrorObject(std::move(lastError));
+ result.setValue(extractPreOrPostImage(opCtx, oplogEntry));
+
+ return result;
+ }
+
+ // Upsert case
+ if (opType == repl::OpTypeEnum::kInsert) {
+ FindAndModifyLastError lastError;
+ lastError.setN(1);
+ lastError.setUpdatedExisting(false);
+ // TODO: SERVER-30532 set upserted
+
+ result.setLastErrorObject(std::move(lastError));
+ result.setValue(oplogEntry.getObject().getOwned());
+
+ return result;
+ }
+
+ // Update case
+ FindAndModifyLastError lastError;
+ lastError.setN(1);
+ lastError.setUpdatedExisting(true);
+
+ result.setLastErrorObject(std::move(lastError));
+ result.setValue(extractPreOrPostImage(opCtx, oplogEntry));
+
+ return result;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h
index f3de5c353fa..37f87a0d742 100644
--- a/src/mongo/db/ops/write_ops_retryability.h
+++ b/src/mongo/db/ops/write_ops_retryability.h
@@ -28,12 +28,16 @@
#pragma once
+#include "mongo/db/commands/find_and_modify_gen.h"
#include "mongo/db/ops/single_write_result_gen.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/oplog_entry.h"
namespace mongo {
+class FindAndModifyRequest;
+class OperationContext;
+
/**
* Returns the single write result corresponding to the given oplog entry for insert, update, and
* delete commands, i.e. the single write result that would have been returned by the statement that
@@ -44,4 +48,11 @@ SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry);
SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry);
SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry);
+/**
+ * Returns the result of a findAndModify based on the oplog entries generated by the operation.
+ */
+FindAndModifyResult parseOplogEntryForFindAndModify(OperationContext* opCtx,
+ const FindAndModifyRequest& request,
+ const repl::OplogEntry& oplogEntry);
+
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index 6e6d30fd491..86f581659ba 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -29,9 +29,16 @@
#include "mongo/platform/basic.h"
#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/find_and_modify_request.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -110,5 +117,271 @@ TEST_F(WriteOpsRetryability, ParseOplogEntryForDelete) {
ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj());
}
+class FindAndModifyRetryability : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+
+ _opCtx = cc().makeOperationContext();
+
+ // Insert code path assumes existence of repl coordinator!
+ repl::ReplSettings replSettings;
+ replSettings.setReplSetString(
+ ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")})
+ .toString());
+ replSettings.setMaster(true);
+
+ auto service = getServiceContext();
+ repl::ReplicationCoordinator::set(
+ service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings));
+
+ // Note: internal code does not allow implicit creation of non-capped oplog collection.
+ DBDirectClient client(opCtx());
+ ASSERT_TRUE(
+ client.createCollection(NamespaceString::kRsOplogNamespace.ns(), 1024 * 1024, true));
+ }
+
+ void tearDown() override {
+ // ServiceContextMongoDTest::tearDown() will try to create it's own opCtx, and it's not
+ // allowed to have 2 present per client, so destroy this one.
+ _opCtx.reset();
+
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ /**
+ * Helper method for inserting new entries to the oplog. This completely bypasses
+ * fixDocumentForInsert.
+ */
+ void insertOplogEntry(BSONObj entry) {
+ AutoGetCollection autoColl(opCtx(), NamespaceString::kRsOplogNamespace, MODE_IX);
+ auto coll = autoColl.getCollection();
+ ASSERT_TRUE(coll != nullptr);
+
+ auto status = coll->insertDocument(opCtx(),
+ InsertStatement(entry),
+ &CurOp::get(opCtx())->debug(),
+ /* enforceQuota */ false,
+ /* fromMigrate */ false);
+ ASSERT_OK(status);
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+private:
+ ServiceContext::UniqueOperationContext _opCtx;
+};
+
+NamespaceString kNs("test.user");
+
+TEST_F(FindAndModifyRetryability, BasicUpsert) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setUpsert(true);
+
+ repl::OplogEntry insertOplog(repl::OpTime(), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1));
+
+ auto result = parseOplogEntryForFindAndModify(nullptr, request, insertOplog);
+
+ auto lastError = result.getLastErrorObject();
+ ASSERT_EQ(1, lastError.getN());
+ ASSERT_TRUE(lastError.getUpdatedExisting());
+ ASSERT_FALSE(lastError.getUpdatedExisting().value());
+
+ ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue());
+}
+
+TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setUpsert(true);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry oplog(
+ repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1));
+ oplog.setPreImageTs(imageTs);
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException);
+}
+
+TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setUpsert(false);
+
+ repl::OplogEntry insertOplog(repl::OpTime(), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1));
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, insertOplog),
+ AssertionException);
+}
+
+TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPostImageButOplogHasPre) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setShouldReturnNew(true);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry updateOplog(repl::OpTime(),
+ 0,
+ repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("x" << 1 << "y" << 1),
+ BSON("x" << 1));
+ updateOplog.setPreImageTs(imageTs);
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog),
+ AssertionException);
+}
+
+TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpdateButOplogIsDelete) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setShouldReturnNew(true);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry oplog(repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 1));
+ oplog.setPreImageTs(imageTs);
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException);
+}
+
+TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPreImageButOplogHasPost) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setShouldReturnNew(false);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry updateOplog(repl::OpTime(),
+ 0,
+ repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("x" << 1 << "y" << 1),
+ BSON("x" << 1));
+ updateOplog.setPostImageTs(imageTs);
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog),
+ AssertionException);
+}
+
+TEST_F(FindAndModifyRetryability, UpdateWithPreImage) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setShouldReturnNew(false);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry updateOplog(repl::OpTime(),
+ 0,
+ repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("x" << 1 << "y" << 1),
+ BSON("x" << 1));
+ updateOplog.setPreImageTs(imageTs);
+
+ auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog);
+
+ auto lastError = result.getLastErrorObject();
+ ASSERT_EQ(1, lastError.getN());
+ ASSERT_TRUE(lastError.getUpdatedExisting());
+ ASSERT_TRUE(lastError.getUpdatedExisting().value());
+
+ ASSERT_BSONOBJ_EQ(BSON("x" << 1 << "z" << 1), result.getValue());
+}
+
+TEST_F(FindAndModifyRetryability, UpdateWithPostImage) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setShouldReturnNew(true);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry updateOplog(repl::OpTime(),
+ 0,
+ repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("x" << 1 << "y" << 1),
+ BSON("x" << 1));
+ updateOplog.setPostImageTs(imageTs);
+
+ auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog);
+
+ auto lastError = result.getLastErrorObject();
+ ASSERT_EQ(1, lastError.getN());
+ ASSERT_TRUE(lastError.getUpdatedExisting());
+ ASSERT_TRUE(lastError.getUpdatedExisting().value());
+
+ ASSERT_BSONOBJ_EQ(BSON("a" << 1 << "b" << 1), result.getValue());
+}
+
+TEST_F(FindAndModifyRetryability, UpdateWithPostImageButOplogDoesNotExistShouldError) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setShouldReturnNew(true);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry updateOplog(repl::OpTime(),
+ 0,
+ repl::OpTypeEnum::kUpdate,
+ kNs,
+ BSON("x" << 1 << "y" << 1),
+ BSON("x" << 1));
+ updateOplog.setPostImageTs(imageTs);
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog),
+ AssertionException);
+}
+
+TEST_F(FindAndModifyRetryability, BasicRemove) {
+ auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj());
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1));
+
+ insertOplogEntry(noteOplog.toBSON());
+
+ repl::OplogEntry removeOplog(
+ repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 20));
+ removeOplog.setPreImageTs(imageTs);
+
+ auto result = parseOplogEntryForFindAndModify(opCtx(), request, removeOplog);
+
+ auto lastError = result.getLastErrorObject();
+ ASSERT_EQ(1, lastError.getN());
+ ASSERT_FALSE(lastError.getUpdatedExisting());
+
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 20 << "a" << 1), result.getValue());
+}
+
+TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithRemoveErrors) {
+ auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj());
+
+ repl::OplogEntry insertOplog(repl::OpTime(), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1));
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, insertOplog),
+ AssertionException);
+}
+
} // namespace
} // namespace mongo