From 1b386fef6773133a17f1fb1f33a254780c47ab04 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Tue, 18 Apr 2017 19:47:12 -0400 Subject: SERVER-28211 optimize single document operations --- src/mongo/db/repl/rollback_fix_up_info.cpp | 48 ++++++-- src/mongo/db/repl/rollback_fix_up_info_test.cpp | 146 ++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 11 deletions(-) diff --git a/src/mongo/db/repl/rollback_fix_up_info.cpp b/src/mongo/db/repl/rollback_fix_up_info.cpp index ffcc604b189..2e4855aff12 100644 --- a/src/mongo/db/repl/rollback_fix_up_info.cpp +++ b/src/mongo/db/repl/rollback_fix_up_info.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/rollback_fix_up_info.h" #include "mongo/base/string_data.h" +#include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/rollback_fix_up_info_descriptions.h" @@ -72,7 +73,39 @@ Status RollbackFixUpInfo::processSingleDocumentOplogEntry(OperationContext* opCt SingleDocumentOpType opType, const std::string& dbName) { SingleDocumentOperationDescription desc(collectionUuid, docId, opType, dbName); - return _upsertById(opCtx, kRollbackDocsNamespace, desc.toBSON()); + auto doc = desc.toBSON(); + if (SingleDocumentOpType::kInsert == opType) { + // If the existing document (that may or may not exist in the "kRollbackDocsNamespace" + // collection) has a 'delete' op type, this oplog entry will cancel out the previously + // processed 'delete" oplog entry. We should remove the existing document from the + // collection and not insert a new document. + auto deleteResult = + _storageInterface->deleteById(opCtx, kRollbackDocsNamespace, doc["_id"]); + if (deleteResult.isOK()) { + auto existingDoc = deleteResult.getValue(); + if ("delete" == existingDoc["operationType"].String()) { + return Status::OK(); + } + // Fall through and replace the 'update' op type in the existing document with 'insert' + // so that the document will be dropped when we actually do the rollback. + } + } else if (SingleDocumentOpType::kUpdate == opType) { + // If there is an existing document in the "kRollbackDocsNamespace" collection, it must + // have either a 'delete' or 'update' op type. + // + // For a 'delete' entry, we should not replace it with 'update' so that if we process an + // oplog entry with an 'insert' op type later, we can cancel out the existing entry with the + // 'delete' op type. + // + // For an 'update' entry, there is nothing further to do because this matches the current op + // type passed to this function. + auto findResult = _storageInterface->findById(opCtx, kRollbackDocsNamespace, doc["_id"]); + if (findResult.isOK()) { + return Status::OK(); + } + // No existing document. Insert a new document with 'update' op type. + } + return _upsertById(opCtx, kRollbackDocsNamespace, doc); } Status RollbackFixUpInfo::processCreateCollectionOplogEntry(OperationContext* opCtx, @@ -140,16 +173,9 @@ Status RollbackFixUpInfo::processCreateIndexOplogEntry(OperationContext* opCtx, BSONObjBuilder bob; bob.append("_id", desc.makeIdKey()); auto key = bob.obj(); - auto deleteResult = - _storageInterface->deleteDocuments(opCtx, - kRollbackIndexNamespace, - "_id_"_sd, - StorageInterface::ScanDirection::kForward, - key, - BoundInclusion::kIncludeStartKeyOnly, - 1U); - if (deleteResult.isOK() && !deleteResult.getValue().empty()) { - auto doc = deleteResult.getValue().front(); + auto deleteResult = _storageInterface->deleteById(opCtx, kRollbackIndexNamespace, key["_id"]); + if (deleteResult.isOK()) { + auto doc = deleteResult.getValue(); auto opTypeResult = IndexDescription::parseOpType(doc); if (!opTypeResult.isOK()) { invariant(ErrorCodes::FailedToParse == opTypeResult.getStatus()); diff --git a/src/mongo/db/repl/rollback_fix_up_info_test.cpp b/src/mongo/db/repl/rollback_fix_up_info_test.cpp index e8e8b9e0217..83146a02f2c 100644 --- a/src/mongo/db/repl/rollback_fix_up_info_test.cpp +++ b/src/mongo/db/repl/rollback_fix_up_info_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/rollback_fix_up_info.h" +#include "mongo/db/repl/rollback_fix_up_info_descriptions.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -192,6 +193,75 @@ TEST_F(RollbackFixUpInfoTest, opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument}); } +TEST_F(RollbackFixUpInfoTest, + ProcessInsertDocumentOplogEntryWhenExistingDocumentHasDeleteOpTypeRemovesDocument) { + auto collectionUuid = UUID::gen(); + NamespaceString nss("test.t"); + auto doc = BSON("_id" + << "mydocid" + << "x" + << 1); + auto docId = doc["_id"]; + + // State of oplog: + // {op: 'i'}, ...., {op: 'd'}, .... + // (earliest optime) ---> (latest optime) + // + // Oplog entries are processed in reverse optime order. + + // First, process document delete oplog entry. + auto opCtx = makeOpCtx(); + RollbackFixUpInfo rollbackFixUpInfo(_storageInterface.get()); + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString())); + + RollbackFixUpInfo::SingleDocumentOperationDescription desc( + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()}); + + // Next, process an unrelated document insert oplog entry with a different _id in the same + // collection. This is to ensure we do not remove unrelated documents from the + // "kRollbackDocsNamespace" collection. + auto doc2 = BSON("_id" + << "mydocid2" + << "x" + << 2); + auto docId2 = doc2["_id"]; + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId2, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString())); + + RollbackFixUpInfo::SingleDocumentOperationDescription desc2( + collectionUuid, + docId2, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON(), desc2.toBSON()}); + + // Lastly, process document insert oplog entry. This should cancel out the existing delete oplog + // entry with the same _id. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString())); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc2.toBSON()}); +} + TEST_F(RollbackFixUpInfoTest, ProcessDeleteDocumentOplogEntryInsertsDocumentIntoRollbackDocsCollectionWithDeleteOpType) { auto operation = BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL << "op" @@ -270,6 +340,82 @@ TEST_F(RollbackFixUpInfoTest, _assertDocumentsInCollectionEquals( opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument}); + + // Processing another 'update' oplog entry on the same document should not change the document + // in the rollback collection. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kUpdate, + nss.db().toString())); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument}); + + + // Processing an 'insert' oplog entry when the existing document has an 'update' op type should + // replace the existing document in the rollback collection with a new one with an 'insert' op + // type. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString())); + RollbackFixUpInfo::SingleDocumentOperationDescription insertDesc( + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {insertDesc.toBSON()}); +} + +TEST_F( + RollbackFixUpInfoTest, + ProcessUpdateDocumentOplogEntryWhenExistingDocumentHasDeleteOpTypeLeavesExistingDocumentIntact) { + auto collectionUuid = UUID::gen(); + NamespaceString nss("test.t"); + auto doc = BSON("_id" + << "mydocid" + << "x" + << 1); + auto docId = doc["_id"]; + + // State of oplog: + // {op: 'u'}, ...., {op: 'd'}, .... + // (earliest optime) ---> (latest optime) + // + // Oplog entries are processed in reverse optime order. + + // First, process document delete oplog entry. + auto opCtx = makeOpCtx(); + RollbackFixUpInfo rollbackFixUpInfo(_storageInterface.get()); + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString())); + + RollbackFixUpInfo::SingleDocumentOperationDescription desc( + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()}); + + // Next, process document update oplog entry. This should be ignored since the existing entry + // has a delete op type. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kUpdate, + nss.db().toString())); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()}); } TEST_F( -- cgit v1.2.1