diff options
author | Randolph Tan <randolph@10gen.com> | 2017-08-03 18:51:27 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-08-17 16:58:40 -0400 |
commit | 86f8af9d40b294852399097daf80894e28c20adc (patch) | |
tree | 7062c4c7ce754ed1874b8a36e51b103a7f43a8cb /src/mongo/db/ops | |
parent | 1e11cda15ddae9972f9993a7d6b6cbf9d172bcb3 (diff) | |
download | mongo-86f8af9d40b294852399097daf80894e28c20adc.tar.gz |
SERVER-30407 Add retry logic in findAndModify
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r-- | src/mongo/db/ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.h | 11 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability_test.cpp | 273 |
4 files changed, 437 insertions, 0 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 6cd60f6c9e1..ec3fd359719 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -189,6 +189,7 @@ env.CppUnitTest( source='write_ops_retryability_test.cpp', LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/db/write_ops', ], diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 04c92c81213..706a73bc655 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -28,13 +28,123 @@ #include "mongo/platform/basic.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" #include "mongo/db/ops/single_write_result_gen.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/find_and_modify_request.h" +#include "mongo/logger/redaction.h" namespace mongo { +namespace { + +/** + * Validates that the request is retry-compatible with the operation that occurred. + */ +void validateFindAndModifyRetryability(const FindAndModifyRequest& request, + const repl::OplogEntry& oplogEntry) { + auto opType = oplogEntry.getOpType(); + auto ts = oplogEntry.getTimestamp(); + + if (opType == repl::OpTypeEnum::kDelete) { + uassert( + 40606, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " is not compatible with previous write in the transaction of type: " + << OpType_serializer(oplogEntry.getOpType()) + << ", oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + request.isRemove()); + uassert(40607, + str::stream() << "No pre-image available for findAndModify retry request:" + << redact(request.toBSON()), + oplogEntry.getPreImageTs()); + } else if (opType == repl::OpTypeEnum::kInsert) { + uassert( + 40608, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " is not compatible with previous write in the transaction of type: " + << OpType_serializer(oplogEntry.getOpType()) + << ", oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + request.isUpsert()); + } else { + uassert( + 40609, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " is not compatible with previous write in the transaction of type: " + << OpType_serializer(oplogEntry.getOpType()) + << ", oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + opType == repl::OpTypeEnum::kUpdate); + uassert( + 40610, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " is not compatible with previous write in the transaction of type: " + << OpType_serializer(oplogEntry.getOpType()) + << ", oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + !request.isUpsert()); + + if (request.shouldReturnNew()) { + uassert(40611, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " wants the document after update returned, but only before " + "update document is stored, oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + oplogEntry.getPostImageTs()); + } else { + uassert(40612, + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) + << " wants the document before update returned, but only after " + "update document is stored, oplogTs: " + << ts.toString() + << ", oplog: " + << redact(oplogEntry.toBSON()), + oplogEntry.getPreImageTs()); + } + } +} + +/** + * Extracts either the pre or post image (cannot be both) of the findAndModify operation from the + * oplog. + */ +BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) { + invariant(oplog.getPreImageTs() || oplog.getPostImageTs()); + auto ts = + oplog.getPreImageTs() ? oplog.getPreImageTs().value() : oplog.getPostImageTs().value(); + + DBDirectClient client(opCtx); + auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), BSON("ts" << ts)); + + uassert(40613, + str::stream() << "oplog no longer contains the complete write history of this " + "transaction, log with ts " + << ts.toString() + << " cannot be found", + !oplogDoc.isEmpty()); + auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogDoc)); + + return oplogEntry.getObject().getOwned(); +} + +} // namespace + SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry) { invariant(entry.getOpType() == repl::OpTypeEnum::kInsert); @@ -72,4 +182,46 @@ SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry) { return res; } +FindAndModifyResult parseOplogEntryForFindAndModify(OperationContext* opCtx, + const FindAndModifyRequest& request, + const repl::OplogEntry& oplogEntry) { + validateFindAndModifyRetryability(request, oplogEntry); + + FindAndModifyResult result; + + auto opType = oplogEntry.getOpType(); + + if (opType == repl::OpTypeEnum::kDelete) { + FindAndModifyLastError lastError; + lastError.setN(1); + result.setLastErrorObject(std::move(lastError)); + result.setValue(extractPreOrPostImage(opCtx, oplogEntry)); + + return result; + } + + // Upsert case + if (opType == repl::OpTypeEnum::kInsert) { + FindAndModifyLastError lastError; + lastError.setN(1); + lastError.setUpdatedExisting(false); + // TODO: SERVER-30532 set upserted + + result.setLastErrorObject(std::move(lastError)); + result.setValue(oplogEntry.getObject().getOwned()); + + return result; + } + + // Update case + FindAndModifyLastError lastError; + lastError.setN(1); + lastError.setUpdatedExisting(true); + + result.setLastErrorObject(std::move(lastError)); + result.setValue(extractPreOrPostImage(opCtx, oplogEntry)); + + return result; +} + } // namespace mongo diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h index f3de5c353fa..37f87a0d742 100644 --- a/src/mongo/db/ops/write_ops_retryability.h +++ b/src/mongo/db/ops/write_ops_retryability.h @@ -28,12 +28,16 @@ #pragma once +#include "mongo/db/commands/find_and_modify_gen.h" #include "mongo/db/ops/single_write_result_gen.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/oplog_entry.h" namespace mongo { +class FindAndModifyRequest; +class OperationContext; + /** * Returns the single write result corresponding to the given oplog entry for insert, update, and * delete commands, i.e. the single write result that would have been returned by the statement that @@ -44,4 +48,11 @@ SingleWriteResult parseOplogEntryForInsert(const repl::OplogEntry& entry); SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry); SingleWriteResult parseOplogEntryForDelete(const repl::OplogEntry& entry); +/** + * Returns the result of a findAndModify based on the oplog entries generated by the operation. + */ +FindAndModifyResult parseOplogEntryForFindAndModify(OperationContext* opCtx, + const FindAndModifyRequest& request, + const repl::OplogEntry& oplogEntry); + } // namespace mongo diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index 6e6d30fd491..86f581659ba 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -29,9 +29,16 @@ #include "mongo/platform/basic.h" #include "mongo/bson/bsonmisc.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/find_and_modify_request.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/unittest/unittest.h" @@ -110,5 +117,271 @@ TEST_F(WriteOpsRetryability, ParseOplogEntryForDelete) { ASSERT_BSONOBJ_EQ(res.getUpsertedId(), BSONObj()); } +class FindAndModifyRetryability : public ServiceContextMongoDTest { +public: + void setUp() override { + ServiceContextMongoDTest::setUp(); + + _opCtx = cc().makeOperationContext(); + + // Insert code path assumes existence of repl coordinator! + repl::ReplSettings replSettings; + replSettings.setReplSetString( + ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")}) + .toString()); + replSettings.setMaster(true); + + auto service = getServiceContext(); + repl::ReplicationCoordinator::set( + service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings)); + + // Note: internal code does not allow implicit creation of non-capped oplog collection. + DBDirectClient client(opCtx()); + ASSERT_TRUE( + client.createCollection(NamespaceString::kRsOplogNamespace.ns(), 1024 * 1024, true)); + } + + void tearDown() override { + // ServiceContextMongoDTest::tearDown() will try to create it's own opCtx, and it's not + // allowed to have 2 present per client, so destroy this one. + _opCtx.reset(); + + ServiceContextMongoDTest::tearDown(); + } + + /** + * Helper method for inserting new entries to the oplog. This completely bypasses + * fixDocumentForInsert. + */ + void insertOplogEntry(BSONObj entry) { + AutoGetCollection autoColl(opCtx(), NamespaceString::kRsOplogNamespace, MODE_IX); + auto coll = autoColl.getCollection(); + ASSERT_TRUE(coll != nullptr); + + auto status = coll->insertDocument(opCtx(), + InsertStatement(entry), + &CurOp::get(opCtx())->debug(), + /* enforceQuota */ false, + /* fromMigrate */ false); + ASSERT_OK(status); + } + + OperationContext* opCtx() { + return _opCtx.get(); + } + +private: + ServiceContext::UniqueOperationContext _opCtx; +}; + +NamespaceString kNs("test.user"); + +TEST_F(FindAndModifyRetryability, BasicUpsert) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setUpsert(true); + + repl::OplogEntry insertOplog(repl::OpTime(), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1)); + + auto result = parseOplogEntryForFindAndModify(nullptr, request, insertOplog); + + auto lastError = result.getLastErrorObject(); + ASSERT_EQ(1, lastError.getN()); + ASSERT_TRUE(lastError.getUpdatedExisting()); + ASSERT_FALSE(lastError.getUpdatedExisting().value()); + + ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue()); +} + +TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setUpsert(true); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry oplog( + repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1)); + oplog.setPreImageTs(imageTs); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException); +} + +TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setUpsert(false); + + repl::OplogEntry insertOplog(repl::OpTime(), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1)); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, insertOplog), + AssertionException); +} + +TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPostImageButOplogHasPre) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setShouldReturnNew(true); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry updateOplog(repl::OpTime(), + 0, + repl::OpTypeEnum::kUpdate, + kNs, + BSON("x" << 1 << "y" << 1), + BSON("x" << 1)); + updateOplog.setPreImageTs(imageTs); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog), + AssertionException); +} + +TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpdateButOplogIsDelete) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setShouldReturnNew(true); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry oplog(repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 1)); + oplog.setPreImageTs(imageTs); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException); +} + +TEST_F(FindAndModifyRetryability, ErrorIfRequestIsPreImageButOplogHasPost) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setShouldReturnNew(false); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry updateOplog(repl::OpTime(), + 0, + repl::OpTypeEnum::kUpdate, + kNs, + BSON("x" << 1 << "y" << 1), + BSON("x" << 1)); + updateOplog.setPostImageTs(imageTs); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog), + AssertionException); +} + +TEST_F(FindAndModifyRetryability, UpdateWithPreImage) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setShouldReturnNew(false); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry updateOplog(repl::OpTime(), + 0, + repl::OpTypeEnum::kUpdate, + kNs, + BSON("x" << 1 << "y" << 1), + BSON("x" << 1)); + updateOplog.setPreImageTs(imageTs); + + auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog); + + auto lastError = result.getLastErrorObject(); + ASSERT_EQ(1, lastError.getN()); + ASSERT_TRUE(lastError.getUpdatedExisting()); + ASSERT_TRUE(lastError.getUpdatedExisting().value()); + + ASSERT_BSONOBJ_EQ(BSON("x" << 1 << "z" << 1), result.getValue()); +} + +TEST_F(FindAndModifyRetryability, UpdateWithPostImage) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setShouldReturnNew(true); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("a" << 1 << "b" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry updateOplog(repl::OpTime(), + 0, + repl::OpTypeEnum::kUpdate, + kNs, + BSON("x" << 1 << "y" << 1), + BSON("x" << 1)); + updateOplog.setPostImageTs(imageTs); + + auto result = parseOplogEntryForFindAndModify(opCtx(), request, updateOplog); + + auto lastError = result.getLastErrorObject(); + ASSERT_EQ(1, lastError.getN()); + ASSERT_TRUE(lastError.getUpdatedExisting()); + ASSERT_TRUE(lastError.getUpdatedExisting().value()); + + ASSERT_BSONOBJ_EQ(BSON("a" << 1 << "b" << 1), result.getValue()); +} + +TEST_F(FindAndModifyRetryability, UpdateWithPostImageButOplogDoesNotExistShouldError) { + auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj()); + request.setShouldReturnNew(true); + + Timestamp imageTs(120, 3); + repl::OplogEntry updateOplog(repl::OpTime(), + 0, + repl::OpTypeEnum::kUpdate, + kNs, + BSON("x" << 1 << "y" << 1), + BSON("x" << 1)); + updateOplog.setPostImageTs(imageTs); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, updateOplog), + AssertionException); +} + +TEST_F(FindAndModifyRetryability, BasicRemove) { + auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj()); + + Timestamp imageTs(120, 3); + repl::OplogEntry noteOplog( + repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("_id" << 20 << "a" << 1)); + + insertOplogEntry(noteOplog.toBSON()); + + repl::OplogEntry removeOplog( + repl::OpTime(), 0, repl::OpTypeEnum::kDelete, kNs, BSON("_id" << 20)); + removeOplog.setPreImageTs(imageTs); + + auto result = parseOplogEntryForFindAndModify(opCtx(), request, removeOplog); + + auto lastError = result.getLastErrorObject(); + ASSERT_EQ(1, lastError.getN()); + ASSERT_FALSE(lastError.getUpdatedExisting()); + + ASSERT_BSONOBJ_EQ(BSON("_id" << 20 << "a" << 1), result.getValue()); +} + +TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithRemoveErrors) { + auto request = FindAndModifyRequest::makeRemove(kNs, BSONObj()); + + repl::OplogEntry insertOplog(repl::OpTime(), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1)); + + ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, insertOplog), + AssertionException); +} + } // namespace } // namespace mongo |