diff options
author | Benety Goh <benety@mongodb.com> | 2017-04-18 19:47:12 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-04-21 10:20:37 -0400 |
commit | 1b386fef6773133a17f1fb1f33a254780c47ab04 (patch) | |
tree | 0cdf332babbcb1a1cec3f2cdddb38dd56b6d15bb | |
parent | e4f20f24ddbb9cea2255df6fe0cfd34ddeb0263e (diff) | |
download | mongo-1b386fef6773133a17f1fb1f33a254780c47ab04.tar.gz |
SERVER-28211 optimize single document operations
-rw-r--r-- | src/mongo/db/repl/rollback_fix_up_info.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_fix_up_info_test.cpp | 146 |
2 files changed, 183 insertions, 11 deletions
diff --git a/src/mongo/db/repl/rollback_fix_up_info.cpp b/src/mongo/db/repl/rollback_fix_up_info.cpp index ffcc604b189..2e4855aff12 100644 --- a/src/mongo/db/repl/rollback_fix_up_info.cpp +++ b/src/mongo/db/repl/rollback_fix_up_info.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/rollback_fix_up_info.h" #include "mongo/base/string_data.h" +#include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/rollback_fix_up_info_descriptions.h" @@ -72,7 +73,39 @@ Status RollbackFixUpInfo::processSingleDocumentOplogEntry(OperationContext* opCt SingleDocumentOpType opType, const std::string& dbName) { SingleDocumentOperationDescription desc(collectionUuid, docId, opType, dbName); - return _upsertById(opCtx, kRollbackDocsNamespace, desc.toBSON()); + auto doc = desc.toBSON(); + if (SingleDocumentOpType::kInsert == opType) { + // If the existing document (that may or may not exist in the "kRollbackDocsNamespace" + // collection) has a 'delete' op type, this oplog entry will cancel out the previously + // processed 'delete" oplog entry. We should remove the existing document from the + // collection and not insert a new document. + auto deleteResult = + _storageInterface->deleteById(opCtx, kRollbackDocsNamespace, doc["_id"]); + if (deleteResult.isOK()) { + auto existingDoc = deleteResult.getValue(); + if ("delete" == existingDoc["operationType"].String()) { + return Status::OK(); + } + // Fall through and replace the 'update' op type in the existing document with 'insert' + // so that the document will be dropped when we actually do the rollback. + } + } else if (SingleDocumentOpType::kUpdate == opType) { + // If there is an existing document in the "kRollbackDocsNamespace" collection, it must + // have either a 'delete' or 'update' op type. + // + // For a 'delete' entry, we should not replace it with 'update' so that if we process an + // oplog entry with an 'insert' op type later, we can cancel out the existing entry with the + // 'delete' op type. + // + // For an 'update' entry, there is nothing further to do because this matches the current op + // type passed to this function. + auto findResult = _storageInterface->findById(opCtx, kRollbackDocsNamespace, doc["_id"]); + if (findResult.isOK()) { + return Status::OK(); + } + // No existing document. Insert a new document with 'update' op type. + } + return _upsertById(opCtx, kRollbackDocsNamespace, doc); } Status RollbackFixUpInfo::processCreateCollectionOplogEntry(OperationContext* opCtx, @@ -140,16 +173,9 @@ Status RollbackFixUpInfo::processCreateIndexOplogEntry(OperationContext* opCtx, BSONObjBuilder bob; bob.append("_id", desc.makeIdKey()); auto key = bob.obj(); - auto deleteResult = - _storageInterface->deleteDocuments(opCtx, - kRollbackIndexNamespace, - "_id_"_sd, - StorageInterface::ScanDirection::kForward, - key, - BoundInclusion::kIncludeStartKeyOnly, - 1U); - if (deleteResult.isOK() && !deleteResult.getValue().empty()) { - auto doc = deleteResult.getValue().front(); + auto deleteResult = _storageInterface->deleteById(opCtx, kRollbackIndexNamespace, key["_id"]); + if (deleteResult.isOK()) { + auto doc = deleteResult.getValue(); auto opTypeResult = IndexDescription::parseOpType(doc); if (!opTypeResult.isOK()) { invariant(ErrorCodes::FailedToParse == opTypeResult.getStatus()); diff --git a/src/mongo/db/repl/rollback_fix_up_info_test.cpp b/src/mongo/db/repl/rollback_fix_up_info_test.cpp index e8e8b9e0217..83146a02f2c 100644 --- a/src/mongo/db/repl/rollback_fix_up_info_test.cpp +++ b/src/mongo/db/repl/rollback_fix_up_info_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/rollback_fix_up_info.h" +#include "mongo/db/repl/rollback_fix_up_info_descriptions.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -193,6 +194,75 @@ TEST_F(RollbackFixUpInfoTest, } TEST_F(RollbackFixUpInfoTest, + ProcessInsertDocumentOplogEntryWhenExistingDocumentHasDeleteOpTypeRemovesDocument) { + auto collectionUuid = UUID::gen(); + NamespaceString nss("test.t"); + auto doc = BSON("_id" + << "mydocid" + << "x" + << 1); + auto docId = doc["_id"]; + + // State of oplog: + // {op: 'i'}, ...., {op: 'd'}, .... + // (earliest optime) ---> (latest optime) + // + // Oplog entries are processed in reverse optime order. + + // First, process document delete oplog entry. + auto opCtx = makeOpCtx(); + RollbackFixUpInfo rollbackFixUpInfo(_storageInterface.get()); + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString())); + + RollbackFixUpInfo::SingleDocumentOperationDescription desc( + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()}); + + // Next, process an unrelated document insert oplog entry with a different _id in the same + // collection. This is to ensure we do not remove unrelated documents from the + // "kRollbackDocsNamespace" collection. + auto doc2 = BSON("_id" + << "mydocid2" + << "x" + << 2); + auto docId2 = doc2["_id"]; + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId2, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString())); + + RollbackFixUpInfo::SingleDocumentOperationDescription desc2( + collectionUuid, + docId2, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON(), desc2.toBSON()}); + + // Lastly, process document insert oplog entry. This should cancel out the existing delete oplog + // entry with the same _id. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString())); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc2.toBSON()}); +} + +TEST_F(RollbackFixUpInfoTest, ProcessDeleteDocumentOplogEntryInsertsDocumentIntoRollbackDocsCollectionWithDeleteOpType) { auto operation = BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL << "op" << "d" @@ -270,6 +340,82 @@ TEST_F(RollbackFixUpInfoTest, _assertDocumentsInCollectionEquals( opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument}); + + // Processing another 'update' oplog entry on the same document should not change the document + // in the rollback collection. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kUpdate, + nss.db().toString())); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument}); + + + // Processing an 'insert' oplog entry when the existing document has an 'update' op type should + // replace the existing document in the rollback collection with a new one with an 'insert' op + // type. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString())); + RollbackFixUpInfo::SingleDocumentOperationDescription insertDesc( + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kInsert, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {insertDesc.toBSON()}); +} + +TEST_F( + RollbackFixUpInfoTest, + ProcessUpdateDocumentOplogEntryWhenExistingDocumentHasDeleteOpTypeLeavesExistingDocumentIntact) { + auto collectionUuid = UUID::gen(); + NamespaceString nss("test.t"); + auto doc = BSON("_id" + << "mydocid" + << "x" + << 1); + auto docId = doc["_id"]; + + // State of oplog: + // {op: 'u'}, ...., {op: 'd'}, .... + // (earliest optime) ---> (latest optime) + // + // Oplog entries are processed in reverse optime order. + + // First, process document delete oplog entry. + auto opCtx = makeOpCtx(); + RollbackFixUpInfo rollbackFixUpInfo(_storageInterface.get()); + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString())); + + RollbackFixUpInfo::SingleDocumentOperationDescription desc( + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kDelete, + nss.db().toString()); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()}); + + // Next, process document update oplog entry. This should be ignored since the existing entry + // has a delete op type. + ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry( + opCtx.get(), + collectionUuid, + docId, + RollbackFixUpInfo::SingleDocumentOpType::kUpdate, + nss.db().toString())); + _assertDocumentsInCollectionEquals( + opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()}); } TEST_F( |