summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-04-18 19:47:12 -0400
committerBenety Goh <benety@mongodb.com>2017-04-21 10:20:37 -0400
commit1b386fef6773133a17f1fb1f33a254780c47ab04 (patch)
tree0cdf332babbcb1a1cec3f2cdddb38dd56b6d15bb
parente4f20f24ddbb9cea2255df6fe0cfd34ddeb0263e (diff)
downloadmongo-1b386fef6773133a17f1fb1f33a254780c47ab04.tar.gz
SERVER-28211 optimize single document operations
-rw-r--r--src/mongo/db/repl/rollback_fix_up_info.cpp48
-rw-r--r--src/mongo/db/repl/rollback_fix_up_info_test.cpp146
2 files changed, 183 insertions, 11 deletions
diff --git a/src/mongo/db/repl/rollback_fix_up_info.cpp b/src/mongo/db/repl/rollback_fix_up_info.cpp
index ffcc604b189..2e4855aff12 100644
--- a/src/mongo/db/repl/rollback_fix_up_info.cpp
+++ b/src/mongo/db/repl/rollback_fix_up_info.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/repl/rollback_fix_up_info.h"
#include "mongo/base/string_data.h"
+#include "mongo/bson/simple_bsonelement_comparator.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/rollback_fix_up_info_descriptions.h"
@@ -72,7 +73,39 @@ Status RollbackFixUpInfo::processSingleDocumentOplogEntry(OperationContext* opCt
SingleDocumentOpType opType,
const std::string& dbName) {
SingleDocumentOperationDescription desc(collectionUuid, docId, opType, dbName);
- return _upsertById(opCtx, kRollbackDocsNamespace, desc.toBSON());
+ auto doc = desc.toBSON();
+ if (SingleDocumentOpType::kInsert == opType) {
+ // If the existing document (that may or may not exist in the "kRollbackDocsNamespace"
+ // collection) has a 'delete' op type, this oplog entry will cancel out the previously
+ // processed 'delete" oplog entry. We should remove the existing document from the
+ // collection and not insert a new document.
+ auto deleteResult =
+ _storageInterface->deleteById(opCtx, kRollbackDocsNamespace, doc["_id"]);
+ if (deleteResult.isOK()) {
+ auto existingDoc = deleteResult.getValue();
+ if ("delete" == existingDoc["operationType"].String()) {
+ return Status::OK();
+ }
+ // Fall through and replace the 'update' op type in the existing document with 'insert'
+ // so that the document will be dropped when we actually do the rollback.
+ }
+ } else if (SingleDocumentOpType::kUpdate == opType) {
+ // If there is an existing document in the "kRollbackDocsNamespace" collection, it must
+ // have either a 'delete' or 'update' op type.
+ //
+ // For a 'delete' entry, we should not replace it with 'update' so that if we process an
+ // oplog entry with an 'insert' op type later, we can cancel out the existing entry with the
+ // 'delete' op type.
+ //
+ // For an 'update' entry, there is nothing further to do because this matches the current op
+ // type passed to this function.
+ auto findResult = _storageInterface->findById(opCtx, kRollbackDocsNamespace, doc["_id"]);
+ if (findResult.isOK()) {
+ return Status::OK();
+ }
+ // No existing document. Insert a new document with 'update' op type.
+ }
+ return _upsertById(opCtx, kRollbackDocsNamespace, doc);
}
Status RollbackFixUpInfo::processCreateCollectionOplogEntry(OperationContext* opCtx,
@@ -140,16 +173,9 @@ Status RollbackFixUpInfo::processCreateIndexOplogEntry(OperationContext* opCtx,
BSONObjBuilder bob;
bob.append("_id", desc.makeIdKey());
auto key = bob.obj();
- auto deleteResult =
- _storageInterface->deleteDocuments(opCtx,
- kRollbackIndexNamespace,
- "_id_"_sd,
- StorageInterface::ScanDirection::kForward,
- key,
- BoundInclusion::kIncludeStartKeyOnly,
- 1U);
- if (deleteResult.isOK() && !deleteResult.getValue().empty()) {
- auto doc = deleteResult.getValue().front();
+ auto deleteResult = _storageInterface->deleteById(opCtx, kRollbackIndexNamespace, key["_id"]);
+ if (deleteResult.isOK()) {
+ auto doc = deleteResult.getValue();
auto opTypeResult = IndexDescription::parseOpType(doc);
if (!opTypeResult.isOK()) {
invariant(ErrorCodes::FailedToParse == opTypeResult.getStatus());
diff --git a/src/mongo/db/repl/rollback_fix_up_info_test.cpp b/src/mongo/db/repl/rollback_fix_up_info_test.cpp
index e8e8b9e0217..83146a02f2c 100644
--- a/src/mongo/db/repl/rollback_fix_up_info_test.cpp
+++ b/src/mongo/db/repl/rollback_fix_up_info_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/rollback_fix_up_info.h"
+#include "mongo/db/repl/rollback_fix_up_info_descriptions.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
@@ -193,6 +194,75 @@ TEST_F(RollbackFixUpInfoTest,
}
TEST_F(RollbackFixUpInfoTest,
+ ProcessInsertDocumentOplogEntryWhenExistingDocumentHasDeleteOpTypeRemovesDocument) {
+ auto collectionUuid = UUID::gen();
+ NamespaceString nss("test.t");
+ auto doc = BSON("_id"
+ << "mydocid"
+ << "x"
+ << 1);
+ auto docId = doc["_id"];
+
+ // State of oplog:
+ // {op: 'i'}, ...., {op: 'd'}, ....
+ // (earliest optime) ---> (latest optime)
+ //
+ // Oplog entries are processed in reverse optime order.
+
+ // First, process document delete oplog entry.
+ auto opCtx = makeOpCtx();
+ RollbackFixUpInfo rollbackFixUpInfo(_storageInterface.get());
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kDelete,
+ nss.db().toString()));
+
+ RollbackFixUpInfo::SingleDocumentOperationDescription desc(
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kDelete,
+ nss.db().toString());
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()});
+
+ // Next, process an unrelated document insert oplog entry with a different _id in the same
+ // collection. This is to ensure we do not remove unrelated documents from the
+ // "kRollbackDocsNamespace" collection.
+ auto doc2 = BSON("_id"
+ << "mydocid2"
+ << "x"
+ << 2);
+ auto docId2 = doc2["_id"];
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId2,
+ RollbackFixUpInfo::SingleDocumentOpType::kInsert,
+ nss.db().toString()));
+
+ RollbackFixUpInfo::SingleDocumentOperationDescription desc2(
+ collectionUuid,
+ docId2,
+ RollbackFixUpInfo::SingleDocumentOpType::kInsert,
+ nss.db().toString());
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON(), desc2.toBSON()});
+
+ // Lastly, process document insert oplog entry. This should cancel out the existing delete oplog
+ // entry with the same _id.
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kInsert,
+ nss.db().toString()));
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc2.toBSON()});
+}
+
+TEST_F(RollbackFixUpInfoTest,
ProcessDeleteDocumentOplogEntryInsertsDocumentIntoRollbackDocsCollectionWithDeleteOpType) {
auto operation = BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL << "op"
<< "d"
@@ -270,6 +340,82 @@ TEST_F(RollbackFixUpInfoTest,
_assertDocumentsInCollectionEquals(
opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument});
+
+ // Processing another 'update' oplog entry on the same document should not change the document
+ // in the rollback collection.
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kUpdate,
+ nss.db().toString()));
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {expectedDocument});
+
+
+ // Processing an 'insert' oplog entry when the existing document has an 'update' op type should
+ // replace the existing document in the rollback collection with a new one with an 'insert' op
+ // type.
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kInsert,
+ nss.db().toString()));
+ RollbackFixUpInfo::SingleDocumentOperationDescription insertDesc(
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kInsert,
+ nss.db().toString());
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {insertDesc.toBSON()});
+}
+
+TEST_F(
+ RollbackFixUpInfoTest,
+ ProcessUpdateDocumentOplogEntryWhenExistingDocumentHasDeleteOpTypeLeavesExistingDocumentIntact) {
+ auto collectionUuid = UUID::gen();
+ NamespaceString nss("test.t");
+ auto doc = BSON("_id"
+ << "mydocid"
+ << "x"
+ << 1);
+ auto docId = doc["_id"];
+
+ // State of oplog:
+ // {op: 'u'}, ...., {op: 'd'}, ....
+ // (earliest optime) ---> (latest optime)
+ //
+ // Oplog entries are processed in reverse optime order.
+
+ // First, process document delete oplog entry.
+ auto opCtx = makeOpCtx();
+ RollbackFixUpInfo rollbackFixUpInfo(_storageInterface.get());
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kDelete,
+ nss.db().toString()));
+
+ RollbackFixUpInfo::SingleDocumentOperationDescription desc(
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kDelete,
+ nss.db().toString());
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()});
+
+ // Next, process document update oplog entry. This should be ignored since the existing entry
+ // has a delete op type.
+ ASSERT_OK(rollbackFixUpInfo.processSingleDocumentOplogEntry(
+ opCtx.get(),
+ collectionUuid,
+ docId,
+ RollbackFixUpInfo::SingleDocumentOpType::kUpdate,
+ nss.db().toString()));
+ _assertDocumentsInCollectionEquals(
+ opCtx.get(), RollbackFixUpInfo::kRollbackDocsNamespace, {desc.toBSON()});
}
TEST_F(