From 102c20ab166050ae7357732070bc8262aa61749c Mon Sep 17 00:00:00 2001 From: Lingzhi Deng Date: Mon, 24 Jun 2019 16:44:36 -0400 Subject: SERVER-37180: Use OplogEntry everywhere in oplog application --- src/mongo/db/catalog/create_collection.cpp | 7 +- src/mongo/db/catalog/create_collection.h | 3 +- src/mongo/db/catalog/create_collection_test.cpp | 24 +- src/mongo/db/catalog/rename_collection.cpp | 6 +- src/mongo/db/catalog/rename_collection.h | 2 +- src/mongo/db/catalog/rename_collection_test.cpp | 77 +- src/mongo/db/repl/SConscript | 3 +- src/mongo/db/repl/applier_helpers.cpp | 58 +- src/mongo/db/repl/apply_ops.cpp | 24 +- src/mongo/db/repl/dbcheck.cpp | 15 +- src/mongo/db/repl/dbcheck.h | 4 - src/mongo/db/repl/oplog.cpp | 851 +++++++++------------ src/mongo/db/repl/oplog.h | 8 +- src/mongo/db/repl/oplog_entry.h | 7 +- src/mongo/db/repl/oplog_entry_batch.cpp | 85 ++ src/mongo/db/repl/oplog_entry_batch.h | 86 +++ src/mongo/db/repl/sync_tail.cpp | 39 +- src/mongo/db/repl/sync_tail.h | 11 +- src/mongo/db/repl/sync_tail_test.cpp | 91 +-- src/mongo/db/repl/sync_tail_test_fixture.cpp | 8 +- src/mongo/db/repl/sync_tail_test_fixture.h | 2 +- .../db/repl/transaction_oplog_application.cpp | 2 +- src/mongo/dbtests/repltests.cpp | 3 +- src/mongo/dbtests/storage_timestamp_tests.cpp | 54 +- 24 files changed, 719 insertions(+), 751 deletions(-) create mode 100644 src/mongo/db/repl/oplog_entry_batch.cpp create mode 100644 src/mongo/db/repl/oplog_entry_batch.h (limited to 'src/mongo') diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index d5fa352e829..b974ed83489 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -195,7 +195,7 @@ Status createCollection(OperationContext* opCtx, Status createCollectionForApplyOps(OperationContext* opCtx, const std::string& dbName, - const BSONElement& ui, + const OptionalCollectionUUID& ui, const BSONObj& cmdObj, const BSONObj& idIndex) { invariant(opCtx->lockState()->isDbLockedForMode(dbName, MODE_X)); @@ -212,15 +212,14 @@ Status createCollectionForApplyOps(OperationContext* opCtx, // We need to do the renaming part in a separate transaction, as we cannot transactionally // create a database on MMAPv1, which could result in createCollection failing if the database // does not yet exist. - if (ui.ok()) { + if (ui) { // Return an optional, indicating whether we need to early return (if the collection already // exists, or in case of an error). using Result = boost::optional; auto result = writeConflictRetry(opCtx, "createCollectionForApplyOps", newCollName.ns(), [&] { WriteUnitOfWork wunit(opCtx); - // Options need the field to be named "uuid", so parse/recreate. - auto uuid = uassertStatusOK(UUID::parse(ui)); + auto uuid = ui.get(); uassert(ErrorCodes::InvalidUUID, "Invalid UUID in applyOps create command: " + uuid.toString(), uuid.isRFC4122v4()); diff --git a/src/mongo/db/catalog/create_collection.h b/src/mongo/db/catalog/create_collection.h index 462de3ee3a7..645b9f62a64 100644 --- a/src/mongo/db/catalog/create_collection.h +++ b/src/mongo/db/catalog/create_collection.h @@ -31,6 +31,7 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/collection_catalog.h" namespace mongo { class BSONObj; @@ -55,7 +56,7 @@ Status createCollection(OperationContext* opCtx, */ Status createCollectionForApplyOps(OperationContext* opCtx, const std::string& dbName, - const BSONElement& ui, + const OptionalCollectionUUID& ui, const BSONObj& cmdObj, const BSONObj& idIndex = BSONObj()); diff --git a/src/mongo/db/catalog/create_collection_test.cpp b/src/mongo/db/catalog/create_collection_test.cpp index af99db09759..90aea925ffd 100644 --- a/src/mongo/db/catalog/create_collection_test.cpp +++ b/src/mongo/db/catalog/create_collection_test.cpp @@ -121,10 +121,8 @@ TEST_F(CreateCollectionTest, CreateCollectionForApplyOpsWithSpecificUuidNoExisti auto uuid = UUID::gen(); Lock::DBLock lock(opCtx.get(), newNss.db(), MODE_X); - ASSERT_OK(createCollectionForApplyOps(opCtx.get(), - newNss.db().toString(), - uuid.toBSON()["uuid"], - BSON("create" << newNss.coll()))); + ASSERT_OK(createCollectionForApplyOps( + opCtx.get(), newNss.db().toString(), uuid, BSON("create" << newNss.coll()))); ASSERT_TRUE(collectionExists(opCtx.get(), newNss)); } @@ -149,10 +147,8 @@ TEST_F(CreateCollectionTest, // This should rename the existing collection 'curNss' to the collection 'newNss' we are trying // to create. - ASSERT_OK(createCollectionForApplyOps(opCtx.get(), - newNss.db().toString(), - uuid.toBSON()["uuid"], - BSON("create" << newNss.coll()))); + ASSERT_OK(createCollectionForApplyOps( + opCtx.get(), newNss.db().toString(), uuid, BSON("create" << newNss.coll()))); ASSERT_FALSE(collectionExists(opCtx.get(), curNss)); ASSERT_TRUE(collectionExists(opCtx.get(), newNss)); @@ -177,10 +173,8 @@ TEST_F(CreateCollectionTest, ASSERT_NOT_EQUALS(uuid, getCollectionUuid(opCtx.get(), newNss)); // This should rename the existing collection 'newNss' to a randomly generated collection name. - ASSERT_OK(createCollectionForApplyOps(opCtx.get(), - newNss.db().toString(), - uuid.toBSON()["uuid"], - BSON("create" << newNss.coll()))); + ASSERT_OK(createCollectionForApplyOps( + opCtx.get(), newNss.db().toString(), uuid, BSON("create" << newNss.coll()))); ASSERT_TRUE(collectionExists(opCtx.get(), newNss)); ASSERT_EQUALS(uuid, getCollectionUuid(opCtx.get(), newNss)); @@ -217,10 +211,8 @@ TEST_F(CreateCollectionTest, // This should fail because we are not allowed to take a collection out of its drop-pending // state. ASSERT_EQUALS(ErrorCodes::NamespaceExists, - createCollectionForApplyOps(opCtx.get(), - newNss.db().toString(), - uuid.toBSON()["uuid"], - BSON("create" << newNss.coll()))); + createCollectionForApplyOps( + opCtx.get(), newNss.db().toString(), uuid, BSON("create" << newNss.coll()))); ASSERT_TRUE(collectionExists(opCtx.get(), dropPendingNss)); ASSERT_FALSE(collectionExists(opCtx.get(), newNss)); diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 352f2ca73b8..8506619312b 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -747,7 +747,7 @@ Status renameCollection(OperationContext* opCtx, Status renameCollectionForApplyOps(OperationContext* opCtx, const std::string& dbName, - const BSONElement& ui, + const OptionalCollectionUUID& uuidToRename, const BSONObj& cmd, const repl::OpTime& renameOpTime) { @@ -769,9 +769,7 @@ Status renameCollectionForApplyOps(OperationContext* opCtx, NamespaceString sourceNss(sourceNsElt.valueStringData()); NamespaceString targetNss(targetNsElt.valueStringData()); - OptionalCollectionUUID uuidToRename; - if (!ui.eoo()) { - uuidToRename = uassertStatusOK(UUID::parse(ui)); + if (uuidToRename) { auto nss = CollectionCatalog::get(opCtx).lookupNSSByUUID(uuidToRename.get()); if (nss) sourceNss = *nss; diff --git a/src/mongo/db/catalog/rename_collection.h b/src/mongo/db/catalog/rename_collection.h index 20756aaaedd..8b4af732173 100644 --- a/src/mongo/db/catalog/rename_collection.h +++ b/src/mongo/db/catalog/rename_collection.h @@ -66,7 +66,7 @@ Status renameCollection(OperationContext* opCtx, */ Status renameCollectionForApplyOps(OperationContext* opCtx, const std::string& dbName, - const BSONElement& ui, + const OptionalCollectionUUID& uuidToRename, const BSONObj& cmd, const repl::OpTime& renameOpTime); diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp index 11da9388c03..4c7e994f061 100644 --- a/src/mongo/db/catalog/rename_collection_test.cpp +++ b/src/mongo/db/catalog/rename_collection_test.cpp @@ -535,7 +535,7 @@ TEST_F(RenameCollectionTest, auto dbName = _sourceNss.db().toString(); auto cmd = BSON("renameCollection" << dropPendingNss.ns() << "to" << _targetNss.ns()); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - renameCollectionForApplyOps(_opCtx.get(), dbName, {}, cmd, {})); + renameCollectionForApplyOps(_opCtx.get(), dbName, boost::none, cmd, {})); // Source collections stays in drop-pending state. ASSERT_FALSE(_collectionExists(_opCtx.get(), _targetNss)); @@ -552,10 +552,9 @@ TEST_F( auto dbName = _sourceNss.db().toString(); NamespaceString ignoredSourceNss(dbName, "ignored"); - auto uuidDoc = options.uuid->toBSON(); auto cmd = BSON("renameCollection" << ignoredSourceNss.ns() << "to" << _targetNss.ns()); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["uuid"], cmd, {})); + renameCollectionForApplyOps(_opCtx.get(), dbName, options.uuid, cmd, {})); // Source collections stays in drop-pending state. ASSERT_FALSE(_collectionExists(_opCtx.get(), _targetNss)); @@ -566,20 +565,18 @@ TEST_F( TEST_F(RenameCollectionTest, RenameCollectionToItselfByNsForApplyOps) { auto dbName = _sourceNss.db().toString(); auto uuid = _createCollectionWithUUID(_opCtx.get(), _sourceNss); - auto uuidDoc = BSON("ui" << uuid); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _sourceNss.ns() << "dropTarget" << true); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuid, cmd, {})); ASSERT_TRUE(_collectionExists(_opCtx.get(), _sourceNss)); } TEST_F(RenameCollectionTest, RenameCollectionToItselfByUUIDForApplyOps) { auto dbName = _targetNss.db().toString(); auto uuid = _createCollectionWithUUID(_opCtx.get(), _targetNss); - auto uuidDoc = BSON("ui" << uuid); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _targetNss.ns() << "dropTarget" << true); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuid, cmd, {})); ASSERT_TRUE(_collectionExists(_opCtx.get(), _targetNss)); } @@ -587,10 +584,9 @@ TEST_F(RenameCollectionTest, RenameCollectionByUUIDRatherThanNsForApplyOps) { auto realRenameFromNss = NamespaceString("test.bar2"); auto dbName = realRenameFromNss.db().toString(); auto uuid = _createCollectionWithUUID(_opCtx.get(), realRenameFromNss); - auto uuidDoc = BSON("ui" << uuid); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _targetNss.ns() << "dropTarget" << true); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuid, cmd, {})); ASSERT_TRUE(_collectionExists(_opCtx.get(), _targetNss)); } @@ -601,11 +597,10 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetByUUIDTargetDo auto dbName = collA.db().toString(); auto collAUUID = _createCollectionWithUUID(_opCtx.get(), collA); auto collCUUID = _createCollectionWithUUID(_opCtx.get(), collC); - auto uuidDoc = BSON("ui" << collAUUID); // Rename A to B, drop C, where B is not an existing collection auto cmd = BSON("renameCollection" << collA.ns() << "to" << collB.ns() << "dropTarget" << collCUUID); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, collAUUID, cmd, {})); // A and C should be dropped ASSERT_FALSE(_collectionExists(_opCtx.get(), collA)); ASSERT_FALSE(_collectionExists(_opCtx.get(), collC)); @@ -625,12 +620,11 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetByUUIDTargetEx auto collAUUID = _createCollectionWithUUID(_opCtx.get(), collA); auto collBUUID = _createCollectionWithUUID(_opCtx.get(), collB); auto collCUUID = _createCollectionWithUUID(_opCtx.get(), collC); - auto uuidDoc = BSON("ui" << collAUUID); // Rename A to B, drop C, where B is an existing collection // B should be kept but with a temporary name auto cmd = BSON("renameCollection" << collA.ns() << "to" << collB.ns() << "dropTarget" << collCUUID); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, collAUUID, cmd, {})); // A and C should be dropped ASSERT_FALSE(_collectionExists(_opCtx.get(), collA)); ASSERT_FALSE(_collectionExists(_opCtx.get(), collC)); @@ -658,12 +652,11 @@ TEST_F(RenameCollectionTest, auto dbName = collA.db().toString(); auto collAUUID = _createCollectionWithUUID(_opCtx.get(), collA); auto collCUUID = _createCollectionWithUUID(_opCtx.get(), collC); - auto uuidDoc = BSON("ui" << collAUUID); // Rename A to B, drop C, where B is an existing collection // B should be kept but with a temporary name auto cmd = BSON("renameCollection" << collA.ns() << "to" << collB.ns() << "dropTarget" << collCUUID); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, collAUUID, cmd, {})); // A and C should be dropped ASSERT_FALSE(_collectionExists(_opCtx.get(), collA)); ASSERT_FALSE(_collectionExists(_opCtx.get(), collC)); @@ -685,12 +678,11 @@ TEST_F(RenameCollectionTest, auto collAUUID = _createCollectionWithUUID(_opCtx.get(), collA); auto collBUUID = _createCollectionWithUUID(_opCtx.get(), collB); auto collCUUID = UUID::gen(); - auto uuidDoc = BSON("ui" << collAUUID); // Rename A to B, drop C, where B is an existing collection // B should be kept but with a temporary name auto cmd = BSON("renameCollection" << collA.ns() << "to" << collB.ns() << "dropTarget" << collCUUID); - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, collAUUID, cmd, {})); // A and C should be dropped ASSERT_FALSE(_collectionExists(_opCtx.get(), collA)); // B (originally A) should exist @@ -759,8 +751,9 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsRejectsRenameOpTimeIfWri auto dbName = _sourceNss.db().toString(); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _targetNss.ns()); auto renameOpTime = _opObserver->renameOpTime; - ASSERT_EQUALS(ErrorCodes::BadValue, - renameCollectionForApplyOps(_opCtx.get(), dbName, {}, cmd, renameOpTime)); + ASSERT_EQUALS( + ErrorCodes::BadValue, + renameCollectionForApplyOps(_opCtx.get(), dbName, boost::none, cmd, renameOpTime)); } TEST_F(RenameCollectionTest, @@ -778,7 +771,7 @@ TEST_F(RenameCollectionTest, << true); repl::OpTime renameOpTime = {Timestamp(Seconds(200), 1U), 1LL}; - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, {}, cmd, renameOpTime)); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, boost::none, cmd, renameOpTime)); // Confirm that the target collection has been renamed to a drop-pending collection. auto dpns = _targetNss.makeDropPendingNamespace(renameOpTime); @@ -800,16 +793,16 @@ DEATH_TEST_F(RenameCollectionTest, << true); repl::OpTime renameOpTime = {Timestamp(Seconds(200), 1U), 1LL}; - ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, {}, cmd, renameOpTime)); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), dbName, boost::none, cmd, renameOpTime)); } TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsSourceAndTargetDoNotExist) { - auto uuidDoc = BSON("ui" << UUID::gen()); + auto uuid = UUID::gen(); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _targetNss.ns() << "dropTarget" << "true"); - ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, - renameCollectionForApplyOps( - _opCtx.get(), _sourceNss.db().toString(), uuidDoc["ui"], cmd, {})); + ASSERT_EQUALS( + ErrorCodes::NamespaceNotFound, + renameCollectionForApplyOps(_opCtx.get(), _sourceNss.db().toString(), uuid, cmd, {})); ASSERT_FALSE(_collectionExists(_opCtx.get(), _sourceNss)); ASSERT_FALSE(_collectionExists(_opCtx.get(), _targetNss)); } @@ -817,12 +810,12 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsSourceAndTargetDoNotExis TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetEvenIfSourceDoesNotExist) { _createCollectionWithUUID(_opCtx.get(), _targetNss); auto missingSourceNss = NamespaceString("test.bar2"); - auto uuidDoc = BSON("ui" << UUID::gen()); + auto uuid = UUID::gen(); auto cmd = BSON("renameCollection" << missingSourceNss.ns() << "to" << _targetNss.ns() << "dropTarget" << "true"); - ASSERT_OK(renameCollectionForApplyOps( - _opCtx.get(), missingSourceNss.db().toString(), uuidDoc["ui"], cmd, {})); + ASSERT_OK( + renameCollectionForApplyOps(_opCtx.get(), missingSourceNss.db().toString(), uuid, cmd, {})); ASSERT_FALSE(_collectionExists(_opCtx.get(), _targetNss)); } @@ -831,11 +824,11 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetByUUIDEvenIfSo auto dropTargetNss = NamespaceString("test.bar3"); _createCollectionWithUUID(_opCtx.get(), _targetNss); auto dropTargetUUID = _createCollectionWithUUID(_opCtx.get(), dropTargetNss); - auto uuidDoc = BSON("ui" << UUID::gen()); + auto uuid = UUID::gen(); auto cmd = BSON("renameCollection" << missingSourceNss.ns() << "to" << _targetNss.ns() << "dropTarget" << dropTargetUUID); - ASSERT_OK(renameCollectionForApplyOps( - _opCtx.get(), missingSourceNss.db().toString(), uuidDoc["ui"], cmd, {})); + ASSERT_OK( + renameCollectionForApplyOps(_opCtx.get(), missingSourceNss.db().toString(), uuid, cmd, {})); ASSERT_TRUE(_collectionExists(_opCtx.get(), _targetNss)); ASSERT_FALSE(_collectionExists(_opCtx.get(), dropTargetNss)); } @@ -845,7 +838,7 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetEvenIfSourceIs auto dropPendingNss = _sourceNss.makeDropPendingNamespace(dropOpTime); auto dropTargetUUID = _createCollectionWithUUID(_opCtx.get(), _targetNss); - auto uuidDoc = BSON("ui" << _createCollectionWithUUID(_opCtx.get(), dropPendingNss)); + auto uuid = _createCollectionWithUUID(_opCtx.get(), dropPendingNss); auto cmd = BSON("renameCollection" << dropPendingNss.ns() << "to" << _targetNss.ns() << "dropTarget" << "true"); @@ -853,7 +846,7 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetEvenIfSourceIs repl::UnreplicatedWritesBlock uwb(_opCtx.get()); repl::OpTime renameOpTime = {Timestamp(Seconds(200), 1U), 1LL}; ASSERT_OK(renameCollectionForApplyOps( - _opCtx.get(), dropPendingNss.db().toString(), uuidDoc["ui"], cmd, renameOpTime)); + _opCtx.get(), dropPendingNss.db().toString(), uuid, cmd, renameOpTime)); // Source collections stays in drop-pending state. ASSERT_TRUE(_collectionExists(_opCtx.get(), dropPendingNss)); @@ -870,14 +863,14 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetByUUIDEvenIfSo _createCollectionWithUUID(_opCtx.get(), _targetNss); auto dropTargetUUID = _createCollectionWithUUID(_opCtx.get(), dropTargetNss); - auto uuidDoc = BSON("ui" << _createCollectionWithUUID(_opCtx.get(), dropPendingNss)); + auto uuid = _createCollectionWithUUID(_opCtx.get(), dropPendingNss); auto cmd = BSON("renameCollection" << dropPendingNss.ns() << "to" << _targetNss.ns() << "dropTarget" << dropTargetUUID); repl::UnreplicatedWritesBlock uwb(_opCtx.get()); repl::OpTime renameOpTime = {Timestamp(Seconds(200), 1U), 1LL}; ASSERT_OK(renameCollectionForApplyOps( - _opCtx.get(), dropPendingNss.db().toString(), uuidDoc["ui"], cmd, renameOpTime)); + _opCtx.get(), dropPendingNss.db().toString(), uuid, cmd, renameOpTime)); // Source collections stays in drop-pending state. ASSERT_TRUE(_collectionExists(_opCtx.get(), dropPendingNss)); @@ -889,14 +882,14 @@ TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetByUUIDEvenIfSo TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsDropTargetByUUIDEvenIfSourceEqualsTarget) { auto dropTargetUUID = _createCollectionWithUUID(_opCtx.get(), _targetNss); - auto uuidDoc = BSON("ui" << _createCollectionWithUUID(_opCtx.get(), _sourceNss)); + auto uuid = _createCollectionWithUUID(_opCtx.get(), _sourceNss); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _sourceNss.ns() << "dropTarget" << dropTargetUUID); repl::UnreplicatedWritesBlock uwb(_opCtx.get()); repl::OpTime renameOpTime = {Timestamp(Seconds(200), 1U), 1LL}; auto dpns = _targetNss.makeDropPendingNamespace(renameOpTime); ASSERT_OK(renameCollectionForApplyOps( - _opCtx.get(), _sourceNss.db().toString(), uuidDoc["ui"], cmd, renameOpTime)); + _opCtx.get(), _sourceNss.db().toString(), uuid, cmd, renameOpTime)); ASSERT_TRUE(_collectionExists(_opCtx.get(), _sourceNss)); ASSERT_TRUE(_collectionExists(_opCtx.get(), dpns)); @@ -1005,7 +998,8 @@ void _testRenameCollectionAcrossDatabaseOplogEntries( if (forApplyOps) { auto cmd = BSON("renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "dropTarget" << true); - ASSERT_OK(renameCollectionForApplyOps(opCtx, sourceNss.db().toString(), {}, cmd, {})); + ASSERT_OK( + renameCollectionForApplyOps(opCtx, sourceNss.db().toString(), boost::none, cmd, {})); } else { RenameCollectionOptions options; options.dropTarget = true; @@ -1140,10 +1134,9 @@ TEST_F(RenameCollectionTest, CatalogPointersRenameValidThroughRenameForApplyOps) Collection* sourceColl = AutoGetCollectionForRead(_opCtx.get(), _sourceNss).getCollection(); ASSERT(sourceColl); - auto uuidDoc = BSON("ui" << UUID::gen()); + auto uuid = UUID::gen(); auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << _targetNss.ns()); - ASSERT_OK(renameCollectionForApplyOps( - _opCtx.get(), _sourceNss.db().toString(), uuidDoc["ui"], cmd, {})); + ASSERT_OK(renameCollectionForApplyOps(_opCtx.get(), _sourceNss.db().toString(), uuid, cmd, {})); ASSERT_FALSE(_collectionExists(_opCtx.get(), _sourceNss)); Collection* targetColl = AutoGetCollectionForRead(_opCtx.get(), _targetNss).getCollection(); @@ -1194,7 +1187,7 @@ TEST_F(RenameCollectionTest, auto cmd = BSON("renameCollection" << _sourceNss.ns() << "to" << invalidTargetNss.ns()); ASSERT_EQUALS(ErrorCodes::InvalidNamespace, - renameCollectionForApplyOps(_opCtx.get(), dbName, {}, cmd, {})); + renameCollectionForApplyOps(_opCtx.get(), dbName, boost::none, cmd, {})); } } // namespace diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index a5817517955..69086a7319f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -33,6 +33,7 @@ env.Library( source=[ 'apply_ops.cpp', 'oplog.cpp', + 'oplog_entry_batch.cpp', 'transaction_oplog_application.cpp', env.Idlc('apply_ops.idl')[0], ], @@ -1394,4 +1395,4 @@ env.Library( '$BUILD_DIR/mongo/base', 'election_reason_counter', ], -) \ No newline at end of file +) diff --git a/src/mongo/db/repl/applier_helpers.cpp b/src/mongo/db/repl/applier_helpers.cpp index 82a4cade0ed..c12fdcbeaf1 100644 --- a/src/mongo/db/repl/applier_helpers.cpp +++ b/src/mongo/db/repl/applier_helpers.cpp @@ -136,58 +136,11 @@ StatusWith InsertGroup::groupAndApplyInserts(ConstIt "Not able to create a group with more than a single insert operation"); } - // Since we found more than one document, create grouped insert of many docs. - // We are going to group many 'i' ops into one big 'i' op, with array fields for - // 'ts', 't', and 'o', corresponding to each individual op. - // For example: - // { ts: Timestamp(1,1), t:1, ns: "test.foo", op:"i", o: {_id:1} } - // { ts: Timestamp(1,2), t:1, ns: "test.foo", op:"i", o: {_id:2} } - // become: - // { ts: [Timestamp(1, 1), Timestamp(1, 2)], - // t: [1, 1], - // o: [{_id: 1}, {_id: 2}], - // ns: "test.foo", - // op: "i" } - BSONObjBuilder groupedInsertBuilder; - - // Populate the "ts" field with an array of all the grouped inserts' timestamps. - { - BSONArrayBuilder tsArrayBuilder(groupedInsertBuilder.subarrayStart("ts")); - for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) { - tsArrayBuilder.append((*groupingIt)->getTimestamp()); - } - } - - // Populate the "t" (term) field with an array of all the grouped inserts' terms. - { - BSONArrayBuilder tArrayBuilder(groupedInsertBuilder.subarrayStart("t")); - for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) { - auto parsedTerm = (*groupingIt)->getTerm(); - long long term = OpTime::kUninitializedTerm; - // Term may not be present (pv0) - if (parsedTerm) { - term = parsedTerm.get(); - } - tArrayBuilder.append(term); - } - } - - // Populate the "o" field with an array of all the grouped inserts. - { - BSONArrayBuilder oArrayBuilder(groupedInsertBuilder.subarrayStart("o")); - for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) { - oArrayBuilder.append((*groupingIt)->getObject()); - } - } - - // Generate an op object of all elements except for "ts", "t", and "o", since we - // need to make those fields arrays of all the ts's, t's, and o's. - groupedInsertBuilder.appendElementsUnique(entry.getRaw()); - - auto groupedInsertObj = groupedInsertBuilder.done(); + // Create an oplog entry batch for grouped inserts. + OplogEntryBatch groupedInsertBatch(it, endOfGroupableOpsIterator); try { - // Apply the group of inserts. - uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode, boost::none)); + // Apply the group of inserts by passing in groupedInsertBatch. + uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertBatch, _mode, boost::none)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. return endOfGroupableOpsIterator - 1; @@ -195,7 +148,8 @@ StatusWith InsertGroup::groupAndApplyInserts(ConstIt // The group insert failed, log an error and fall through to the // application of an individual op. auto status = exceptionToStatus().withContext( - str::stream() << "Error applying inserts in bulk: " << redact(groupedInsertObj) + str::stream() << "Error applying inserts in bulk: " + << redact(groupedInsertBatch.toBSON()) << ". Trying first insert as a lone insert: " << redact(entry.getRaw())); // It's not an error during initial sync to encounter DuplicateKey errors. diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 03e0c8ac566..76f87222aad 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -160,21 +160,26 @@ Status _applyOps(OperationContext* opCtx, << nss.ns() << " in atomic applyOps mode: " << redact(opObj)); } + BSONObjBuilder builder; + builder.appendElements(opObj); + if (!builder.hasField(OplogEntry::kTimestampFieldName)) { + builder.append(OplogEntry::kTimestampFieldName, Timestamp()); + } // Reject malformed operations in an atomic applyOps. - try { - ReplOperation::parse(IDLParserErrorContext("applyOps"), opObj); - } catch (...) { + auto entry = OplogEntry::parse(builder.done()); + if (!entry.isOK()) { uasserted(ErrorCodes::AtomicityFailure, str::stream() << "cannot apply a malformed operation in atomic applyOps mode: " - << redact(opObj) << "; will retry without atomicity: " - << exceptionToStatus().toString()); + << redact(opObj) + << "; will retry without atomicity: " << entry.getStatus()); } OldClientContext ctx(opCtx, nss.ns()); + const auto& op = entry.getValue(); status = repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode); + opCtx, ctx.db(), &op, alwaysUpsert, oplogApplicationMode); if (!status.isOK()) return status; @@ -207,12 +212,11 @@ Status _applyOps(OperationContext* opCtx, if (!builder.hasField(OplogEntry::kHashFieldName)) { builder.append(OplogEntry::kHashFieldName, 0LL); } - auto entryObj = builder.done(); - auto entry = uassertStatusOK(OplogEntry::parse(entryObj)); + auto entry = uassertStatusOK(OplogEntry::parse(builder.done())); if (*opType == 'c') { invariant(opCtx->lockState()->isW()); uassertStatusOK(applyCommand_inlock( - opCtx, opObj, entry, oplogApplicationMode, boost::none)); + opCtx, entry, oplogApplicationMode, boost::none)); return Status::OK(); } @@ -236,7 +240,7 @@ Status _applyOps(OperationContext* opCtx, // ops. This is to leave the door open to parallelizing CRUD op // application in the future. return repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode); + opCtx, ctx.db(), &entry, alwaysUpsert, oplogApplicationMode); }); } catch (const DBException& ex) { ab.append(false); diff --git a/src/mongo/db/repl/dbcheck.cpp b/src/mongo/db/repl/dbcheck.cpp index 289b831b795..b1d4bf2f84f 100644 --- a/src/mongo/db/repl/dbcheck.cpp +++ b/src/mongo/db/repl/dbcheck.cpp @@ -509,27 +509,28 @@ Status dbCheckDatabaseOnSecondary(OperationContext* opCtx, namespace repl { /* - * The corresponding command run on the secondary. + * The corresponding command run during command application. */ Status dbCheckOplogCommand(OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const repl::OpTime& optime, const repl::OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) { + const auto& cmd = entry.getObject(); + OpTime opTime; + if (!opCtx->writesAreReplicated()) { + opTime = entry.getOpTime(); + } auto type = OplogEntries_parse(IDLParserErrorContext("type"), cmd.getStringField("type")); IDLParserErrorContext ctx("o"); switch (type) { case OplogEntriesEnum::Batch: { auto invocation = DbCheckOplogBatch::parse(ctx, cmd); - return dbCheckBatchOnSecondary(opCtx, optime, invocation); + return dbCheckBatchOnSecondary(opCtx, opTime, invocation); } case OplogEntriesEnum::Collection: { auto invocation = DbCheckOplogCollection::parse(ctx, cmd); - return dbCheckDatabaseOnSecondary(opCtx, optime, invocation); + return dbCheckDatabaseOnSecondary(opCtx, opTime, invocation); } } diff --git a/src/mongo/db/repl/dbcheck.h b/src/mongo/db/repl/dbcheck.h index 457087a9365..3f0a9f88e37 100644 --- a/src/mongo/db/repl/dbcheck.h +++ b/src/mongo/db/repl/dbcheck.h @@ -221,10 +221,6 @@ namespace repl { * errors (primarily by writing to the health log), so always returns `Status::OK`. */ Status dbCheckOplogCommand(OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const repl::OpTime& optime, const repl::OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 5757ddfc267..ec1fa40c985 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -404,7 +404,9 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { WriteUnitOfWork wuow(opCtx); if (slot.isNull()) { slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0]; - // TODO: make the oplogEntry a const reference instead of using the guard. + // It would be better to make the oplogEntry a const reference. But because in some cases, a + // new OpTime needs to be assigned within the WUOW as explained earlier, we instead pass + // oplogEntry by pointer and reset the OpTime to null using a ScopeGuard. oplogEntry->setOpTime(slot); } @@ -647,24 +649,25 @@ std::vector getNextOpTimes(OperationContext* opCtx, std::size_t count // ------------------------------------- namespace { -NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) { +NamespaceString extractNs(const NamespaceString& ns, const BSONObj& cmdObj) { BSONElement first = cmdObj.firstElement(); uassert(40073, str::stream() << "collection name has invalid type " << typeName(first.type()), first.canonicalType() == canonicalizeBSONType(mongo::String)); std::string coll = first.valuestr(); uassert(28635, "no collection name specified", !coll.empty()); - return NamespaceString(NamespaceString(ns).db().toString(), coll); + return NamespaceString(ns.db().toString(), coll); } -std::pair parseCollModUUIDAndNss(OperationContext* opCtx, - const BSONElement& ui, - const char* ns, - BSONObj& cmd) { - if (ui.eoo()) { - return std::pair(boost::none, parseNs(ns, cmd)); +std::pair extractCollModUUIDAndNss( + OperationContext* opCtx, + const boost::optional& ui, + const NamespaceString& ns, + const BSONObj& cmd) { + if (!ui) { + return std::pair(boost::none, extractNs(ns, cmd)); } - CollectionUUID uuid = uassertStatusOK(UUID::parse(ui)); + CollectionUUID uuid = ui.get(); auto& catalog = CollectionCatalog::get(opCtx); const auto nsByUUID = catalog.lookupNSSByUUID(uuid); uassert(ErrorCodes::NamespaceNotFound, @@ -674,28 +677,23 @@ std::pair parseCollModUUIDAndNss(Operat return std::pair(uuid, *nsByUUID); } -NamespaceString parseUUID(OperationContext* opCtx, const BSONElement& ui) { - auto statusWithUUID = UUID::parse(ui); - uassertStatusOK(statusWithUUID); - auto uuid = statusWithUUID.getValue(); +NamespaceString extractNsFromUUID(OperationContext* opCtx, const boost::optional& ui) { + invariant(ui); + auto uuid = ui.get(); auto& catalog = CollectionCatalog::get(opCtx); auto nss = catalog.lookupNSSByUUID(uuid); uassert(ErrorCodes::NamespaceNotFound, "No namespace with UUID " + uuid.toString(), nss); return *nss; } -NamespaceString parseUUIDorNs(OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd) { - return ui.ok() ? parseUUID(opCtx, ui) : parseNs(ns, cmd); +NamespaceString extractNsFromUUIDorNs(OperationContext* opCtx, + const NamespaceString& ns, + const boost::optional& ui, + const BSONObj& cmd) { + return ui ? extractNsFromUUID(opCtx, ui) : extractNs(ns, cmd); } using OpApplyFn = std::function stableTimestampForRecovery)>; @@ -717,14 +715,12 @@ struct ApplyOpMetadata { const StringMap kOpsMap = { {"create", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { - const NamespaceString nss(parseNs(ns, cmd)); + const auto& ui = entry.getUuid(); + const auto& cmd = entry.getObject(); + const NamespaceString nss(extractNs(entry.getNss(), cmd)); Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X); if (auto idIndexElem = cmd["idIndex"]) { // Remove "idIndex" field from command. @@ -746,14 +742,12 @@ const StringMap kOpsMap = { {ErrorCodes::NamespaceExists}}}, {"createIndexes", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { - const NamespaceString nss(parseUUIDorNs(opCtx, ns, ui, cmd)); + const auto& cmd = entry.getObject(); + const NamespaceString nss( + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd)); BSONElement first = cmd.firstElement(); invariant(first.fieldNameStringData() == "createIndexes"); uassert(ErrorCodes::InvalidNamespace, @@ -772,10 +766,6 @@ const StringMap kOpsMap = { ErrorCodes::NamespaceNotFound}}}, {"startIndexBuild", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { @@ -805,7 +795,9 @@ const StringMap kOpsMap = { "The startIndexBuild operation is not supported in applyOps mode"}; } - const NamespaceString nss(parseUUIDorNs(opCtx, ns, ui, cmd)); + const auto& ui = entry.getUuid(); + const auto& cmd = entry.getObject(); + const NamespaceString nss(extractNsFromUUIDorNs(opCtx, entry.getNss(), ui, cmd)); auto buildUUIDElem = cmd.getField("indexBuildUUID"); uassert(ErrorCodes::BadValue, @@ -822,16 +814,15 @@ const StringMap kOpsMap = { "Error parsing 'startIndexBuild' oplog entry, field 'indexes' must be an array.", indexesElem.type() == Array); - auto collUUID = uassertStatusOK(UUID::parse(ui)); + uassert(ErrorCodes::BadValue, + "Error parsing 'startIndexBuild' oplog entry, missing required field 'uuid'.", + ui); + auto collUUID = ui.get(); return startIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexesElem, mode); }}}, {"commitIndexBuild", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { @@ -861,6 +852,7 @@ const StringMap kOpsMap = { "The commitIndexBuild operation is not supported in applyOps mode"}; } + const auto& cmd = entry.getObject(); // Ensure the collection name is specified BSONElement first = cmd.firstElement(); invariant(first.fieldNameStringData() == "commitIndexBuild"); @@ -868,7 +860,8 @@ const StringMap kOpsMap = { "commitIndexBuild value must be a string", first.type() == mongo::String); - const NamespaceString nss(parseUUIDorNs(opCtx, ns, ui, cmd)); + const NamespaceString nss( + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd)); auto buildUUIDElem = cmd.getField("indexBuildUUID"); uassert(ErrorCodes::BadValue, @@ -889,10 +882,6 @@ const StringMap kOpsMap = { }}}, {"abortIndexBuild", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTme, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { @@ -922,6 +911,7 @@ const StringMap kOpsMap = { "The abortIndexBuild operation is not supported in applyOps mode"}; } + const auto& cmd = entry.getObject(); // Ensure that the first element is the 'abortIndexBuild' field. BSONElement first = cmd.firstElement(); invariant(first.fieldNameStringData() == "abortIndexBuild"); @@ -951,50 +941,47 @@ const StringMap kOpsMap = { }}}, {"collMod", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { NamespaceString nss; BSONObjBuilder resultWeDontCareAbout; - std::tie(std::ignore, nss) = parseCollModUUIDAndNss(opCtx, ui, ns, cmd); + const auto& cmd = entry.getObject(); + std::tie(std::ignore, nss) = + extractCollModUUIDAndNss(opCtx, entry.getUuid(), entry.getNss(), cmd); return collMod(opCtx, nss, cmd, &resultWeDontCareAbout); }, {ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}}, {"dbCheck", {dbCheckOplogCommand, {}}}, {"dropDatabase", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { - return dropDatabase(opCtx, NamespaceString(ns).db().toString()); + return dropDatabase(opCtx, entry.getNss().db().toString()); }, {ErrorCodes::NamespaceNotFound}}}, {"drop", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { BSONObjBuilder resultWeDontCareAbout; - auto nss = parseUUIDorNs(opCtx, ns, ui, cmd); + const auto& cmd = entry.getObject(); + auto nss = extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd); if (nss.isDropPendingNamespace()) { log() - << "applyCommand: " << nss << " (UUID: " << ui.toString(false) - << "): collection is already in a drop-pending state: ignoring collection drop: " + << "applyCommand: " << nss + << " : collection is already in a drop-pending state: ignoring collection drop: " << redact(cmd); return Status::OK(); } + // Parse optime from oplog entry unless we are applying this command in standalone or on a + // primary (replicated writes enabled). + OpTime opTime; + if (!opCtx->writesAreReplicated()) { + opTime = entry.getOpTime(); + } return dropCollection(opCtx, nss, resultWeDontCareAbout, @@ -1005,74 +992,73 @@ const StringMap kOpsMap = { // deleteIndex(es) is deprecated but still works as of April 10, 2015 {"deleteIndex", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); + const auto& cmd = entry.getObject(); + return dropIndexes(opCtx, + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd), + cmd, + &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"deleteIndexes", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); + const auto& cmd = entry.getObject(); + return dropIndexes(opCtx, + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd), + cmd, + &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"dropIndex", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); + const auto& cmd = entry.getObject(); + return dropIndexes(opCtx, + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd), + cmd, + &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"dropIndexes", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout); + const auto& cmd = entry.getObject(); + return dropIndexes(opCtx, + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd), + cmd, + &resultWeDontCareAbout); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, {"renameCollection", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { - return renameCollectionForApplyOps(opCtx, nsToDatabase(ns), ui, cmd, opTime); + // Parse optime from oplog entry unless we are applying this command in standalone or on a + // primary (replicated writes enabled). + OpTime opTime; + if (!opCtx->writesAreReplicated()) { + opTime = entry.getOpTime(); + } + return renameCollectionForApplyOps( + opCtx, entry.getNss().db().toString(), entry.getUuid(), entry.getObject(), opTime); }, {ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}}, {"applyOps", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { @@ -1085,35 +1071,28 @@ const StringMap kOpsMap = { }}}, {"convertToCapped", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { - convertToCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd["size"].number()); + const auto& cmd = entry.getObject(); + convertToCapped(opCtx, + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd), + cmd["size"].number()); return Status::OK(); }, {ErrorCodes::NamespaceNotFound}}}, {"emptycapped", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { - return emptyCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd)); + return emptyCapped( + opCtx, + extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), entry.getObject())); }, {ErrorCodes::NamespaceNotFound}}}, {"commitTransaction", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { @@ -1121,10 +1100,6 @@ const StringMap kOpsMap = { }}}, {"abortTransaction", {[](OperationContext* opCtx, - const char* ns, - const BSONElement& ui, - BSONObj& cmd, - const OpTime& opTime, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) -> Status { @@ -1173,11 +1148,13 @@ StatusWith OplogApplication::parseMode(const std::string // See replset initial sync code. Status applyOperation_inlock(OperationContext* opCtx, Database* db, - const BSONObj& op, + const OplogEntryBatch& opOrGroupedInserts, bool alwaysUpsert, OplogApplication::Mode mode, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { - LOG(3) << "applying op: " << redact(op) + // Get the single oplog entry to be applied or the first oplog entry of grouped inserts. + auto op = opOrGroupedInserts.getOp(); + LOG(3) << "applying op (or grouped inserts): " << redact(opOrGroupedInserts.toBSON()) << ", oplog application mode: " << OplogApplication::modeToString(mode); // Choose opCounters based on running on standalone/primary or secondary by checking @@ -1187,26 +1164,8 @@ Status applyOperation_inlock(OperationContext* opCtx, mode == repl::OplogApplication::Mode::kApplyOpsCmd || opCtx->writesAreReplicated(); OpCounters* opCounters = shouldUseGlobalOpCounters ? &globalOpCounters : &replOpCounters; - std::array names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2"}; - std::array fields; - op.getFields(names, &fields); - BSONElement& fieldTs = fields[0]; - BSONElement& fieldT = fields[1]; - BSONElement& fieldO = fields[2]; - BSONElement& fieldUI = fields[3]; - BSONElement& fieldNs = fields[4]; - BSONElement& fieldOp = fields[5]; - BSONElement& fieldB = fields[6]; - BSONElement& fieldO2 = fields[7]; - - BSONObj o; - if (fieldO.isABSONObj()) - o = fieldO.embeddedObject(); - - // operation type -- see logOp() comments for types - const char* opType = fieldOp.valuestrsafe(); - - if (*opType == 'n') { + auto opType = op.getOpType(); + if (opType == OpTypeEnum::kNoop) { // no op if (incrementOpsAppliedStats) { incrementOpsAppliedStats(); @@ -1217,23 +1176,18 @@ Status applyOperation_inlock(OperationContext* opCtx, NamespaceString requestNss; Collection* collection = nullptr; - if (fieldUI) { + if (auto uuid = op.getUuid()) { CollectionCatalog& catalog = CollectionCatalog::get(opCtx); - auto uuid = uassertStatusOK(UUID::parse(fieldUI)); - collection = catalog.lookupCollectionByUUID(uuid); + collection = catalog.lookupCollectionByUUID(uuid.get()); uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Failed to apply operation due to missing collection (" << uuid - << "): " << redact(op.toString()), + str::stream() << "Failed to apply operation due to missing collection (" + << uuid.get() << "): " << redact(opOrGroupedInserts.toBSON()), collection); requestNss = collection->ns(); dassert(opCtx->lockState()->isCollectionLockedForMode( requestNss, supportsDocLocking() ? MODE_IX : MODE_X)); } else { - uassert(ErrorCodes::InvalidNamespace, - "'ns' must be of type String", - fieldNs.type() == BSONType::String); - const StringData ns = fieldNs.valueStringDataSafe(); - requestNss = NamespaceString(ns); + requestNss = op.getNss(); invariant(requestNss.coll().size()); dassert(opCtx->lockState()->isCollectionLockedForMode( requestNss, supportsDocLocking() ? MODE_IX : MODE_X), @@ -1241,6 +1195,8 @@ Status applyOperation_inlock(OperationContext* opCtx, collection = db->getCollection(opCtx, requestNss); } + BSONObj o = op.getObject(); + // The feature compatibility version in the server configuration collection must not change // during initial sync. if ((mode == OplogApplication::Mode::kInitialSync) && @@ -1251,15 +1207,13 @@ Status applyOperation_inlock(OperationContext* opCtx, return Status(ErrorCodes::OplogOperationUnsupported, str::stream() << "Applying operation on feature compatibility version " "document not supported in initial sync: " - << redact(op)); + << redact(opOrGroupedInserts.toBSON())); } } BSONObj o2; - if (fieldO2.isABSONObj()) - o2 = fieldO2.Obj(); - - bool valueB = fieldB.booleanSafe(); + if (op.getObject2()) + o2 = op.getObject2().get(); IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); const bool haveWrappingWriteUnitOfWork = opCtx->lockState()->inAWriteUnitOfWork(); @@ -1322,377 +1276,331 @@ Status applyOperation_inlock(OperationContext* opCtx, } MONGO_UNREACHABLE; }(); - invariant(!assignOperationTimestamp || !fieldTs.eoo(), - str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op)); - - if (*opType == 'i') { - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Failed to apply insert due to missing collection: " - << op.toString(), - collection); - - if (fieldO.type() == Array) { - // Batched inserts. - - // Cannot apply an array insert with applyOps command. No support for wiping out - // the provided timestamps and using new ones for oplog. - uassert(ErrorCodes::OperationFailed, - "Cannot apply an array insert with applyOps", - !opCtx->writesAreReplicated()); - - uassert(ErrorCodes::BadValue, - "Expected array for field 'ts'", - fieldTs.ok() && fieldTs.type() == Array); - uassert(ErrorCodes::BadValue, - "Expected array for field 't'", - fieldT.ok() && fieldT.type() == Array); - - uassert(ErrorCodes::OperationFailed, - str::stream() << "Failed to apply insert due to empty array element: " - << op.toString(), - !fieldO.Obj().isEmpty() && !fieldTs.Obj().isEmpty() && !fieldT.Obj().isEmpty()); - - std::vector insertObjs; - auto fieldOIt = BSONObjIterator(fieldO.Obj()); - auto fieldTsIt = BSONObjIterator(fieldTs.Obj()); - auto fieldTIt = BSONObjIterator(fieldT.Obj()); - - while (true) { - auto oElem = fieldOIt.next(); - auto tsElem = fieldTsIt.next(); - auto tElem = fieldTIt.next(); - - // Note: we don't care about statement ids here since the secondaries don't create - // their own oplog entries. - insertObjs.emplace_back(oElem.Obj(), tsElem.timestamp(), tElem.Long()); - if (!fieldOIt.more()) { - // Make sure arrays are the same length. - uassert(ErrorCodes::OperationFailed, - str::stream() - << "Failed to apply insert due to invalid array elements: " - << op.toString(), - !fieldTsIt.more()); - break; - } - // Make sure arrays are the same length. + invariant(!assignOperationTimestamp || !op.getTimestamp().isNull(), + str::stream() << "Oplog entry did not have 'ts' field when expected: " + << redact(opOrGroupedInserts.toBSON())); + + switch (opType) { + case OpTypeEnum::kInsert: { + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply insert due to missing collection: " + << redact(opOrGroupedInserts.toBSON()), + collection); + + if (opOrGroupedInserts.isGroupedInserts()) { + // Grouped inserts. + + // Cannot apply an array insert with applyOps command. No support for wiping out the + // provided timestamps and using new ones for oplog. uassert(ErrorCodes::OperationFailed, - str::stream() << "Failed to apply insert due to invalid array elements: " - << op.toString(), - fieldTsIt.more()); - } + "Cannot apply an array insert with applyOps", + !opCtx->writesAreReplicated()); + + std::vector insertObjs; + const auto insertOps = opOrGroupedInserts.getGroupedInserts(); + for (const auto iOp : insertOps) { + invariant(iOp->getTerm()); + insertObjs.emplace_back( + iOp->getObject(), iOp->getTimestamp(), iOp->getTerm().get()); + } - WriteUnitOfWork wuow(opCtx); - OpDebug* const nullOpDebug = nullptr; - Status status = collection->insertDocuments( - opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true); - if (!status.isOK()) { - return status; - } - wuow.commit(); - for (auto entry : insertObjs) { + WriteUnitOfWork wuow(opCtx); + OpDebug* const nullOpDebug = nullptr; + Status status = collection->insertDocuments( + opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true); + if (!status.isOK()) { + return status; + } + wuow.commit(); + for (auto entry : insertObjs) { + opCounters->gotInsert(); + if (shouldUseGlobalOpCounters) { + ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert( + opCtx->getWriteConcern()); + } + if (incrementOpsAppliedStats) { + incrementOpsAppliedStats(); + } + } + } else { + // Single insert. opCounters->gotInsert(); if (shouldUseGlobalOpCounters) { ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert( opCtx->getWriteConcern()); } + + // No _id. + // This indicates an issue with the upstream server: + // The oplog entry is corrupted; or + // The version of the upstream server is obsolete. + uassert(ErrorCodes::NoSuchKey, + str::stream() + << "Failed to apply insert due to missing _id: " << redact(op.toBSON()), + o.hasField("_id")); + + // 1. Insert if + // a) we do not have a wrapping WriteUnitOfWork, which implies we are not part of + // an "applyOps" command, OR + // b) we are part of a multi-document transaction[1]. + // + // 2. Upsert[2] if + // a) we have a wrapping WriteUnitOfWork AND we are not part of a transaction, + // which implies we are part of an "applyOps" command, OR + // b) the previous insert failed with a DuplicateKey error AND we are not part of + // a transaction. + // + // [1] Transactions should not convert inserts to upserts because on secondaries + // they will perform a lookup that never occurred on the primary. This may cause + // an unintended prepare conflict and block replication. For this reason, + // transactions should always fail with DuplicateKey errors and never retry + // inserts as upserts. + // [2] This upsert behavior exists to support idempotency guarantees outside + // steady-state replication and existing users of applyOps. + + const auto txnParticipant = TransactionParticipant::get(opCtx); + const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + bool needToDoUpsert = haveWrappingWriteUnitOfWork && !inTxn; + + Timestamp timestamp; + long long term = OpTime::kUninitializedTerm; + if (assignOperationTimestamp) { + timestamp = op.getTimestamp(); + invariant(op.getTerm()); + term = op.getTerm().get(); + } + + if (!needToDoUpsert) { + WriteUnitOfWork wuow(opCtx); + + // Do not use supplied timestamps if running through applyOps, as that would + // allow a user to dictate what timestamps appear in the oplog. + if (assignOperationTimestamp) { + timestamp = op.getTimestamp(); + invariant(op.getTerm()); + term = op.getTerm().get(); + } + + OpDebug* const nullOpDebug = nullptr; + Status status = collection->insertDocument( + opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true); + + if (status.isOK()) { + wuow.commit(); + } else if (status == ErrorCodes::DuplicateKey) { + // Transactions cannot be retried as upserts once they fail with a duplicate + // key error. + if (inTxn) { + return status; + } + // Continue to the next block to retry the operation as an upsert. + needToDoUpsert = true; + } else { + return status; + } + } + + // Now see if we need to do an upsert. + if (needToDoUpsert) { + // Do update on DuplicateKey errors. + // This will only be on the _id field in replication, + // since we disable non-_id unique constraint violations. + BSONObjBuilder b; + b.append(o.getField("_id")); + + UpdateRequest request(requestNss); + request.setQuery(b.done()); + request.setUpdateModification(o); + request.setUpsert(); + request.setFromOplogApplication(true); + + const StringData ns = op.getNss().ns(); + writeConflictRetry(opCtx, "applyOps_upsert", ns, [&] { + WriteUnitOfWork wuow(opCtx); + // If this is an atomic applyOps (i.e: `haveWrappingWriteUnitOfWork` is + // true), do not timestamp the write. + if (assignOperationTimestamp && timestamp != Timestamp::min()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); + } + + UpdateResult res = update(opCtx, db, request); + if (res.numMatched == 0 && res.upserted.isEmpty()) { + error() << "No document was updated even though we got a DuplicateKey " + "error when inserting"; + fassertFailedNoTrace(28750); + } + wuow.commit(); + }); + } + if (incrementOpsAppliedStats) { incrementOpsAppliedStats(); } } - } else { - // Single insert. - opCounters->gotInsert(); + break; + } + case OpTypeEnum::kUpdate: { + opCounters->gotUpdate(); if (shouldUseGlobalOpCounters) { - ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert( + ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate( opCtx->getWriteConcern()); } - // No _id. - // This indicates an issue with the upstream server: - // The oplog entry is corrupted; or - // The version of the upstream server is obsolete. + auto idField = o2["_id"]; uassert(ErrorCodes::NoSuchKey, - str::stream() << "Failed to apply insert due to missing _id: " << op.toString(), - o.hasField("_id")); - - // 1. Insert if - // a) we do not have a wrapping WriteUnitOfWork, which implies we are not part of an - // "applyOps" command, OR - // b) we are part of a multi-document transaction[1]. - // - // 2. Upsert[2] if - // a) we have a wrapping WriteUnitOfWork AND we are not part of a transaction, which - // implies we are part of an "applyOps" command, OR - // b) the previous insert failed with a DuplicateKey error AND we are not part of a - // transaction. - // - // [1] Transactions should not convert inserts to upserts because on secondaries they - // will perform a lookup that never occurred on the primary. This may cause an - // unintended prepare conflict and block replication. For this reason, transactions - // should always fail with DuplicateKey errors and never retry inserts as upserts. - // [2] This upsert behavior exists to support idempotency guarantees outside - // steady-state replication and existing users of applyOps. - - const auto txnParticipant = TransactionParticipant::get(opCtx); - const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction(); - bool needToDoUpsert = haveWrappingWriteUnitOfWork && !inTxn; + str::stream() << "Failed to apply update due to missing _id: " + << redact(op.toBSON()), + !idField.eoo()); + + // The o2 field may contain additional fields besides the _id (like the shard key + // fields), but we want to do the update by just _id so we can take advantage of the + // IDHACK. + BSONObj updateCriteria = idField.wrap(); + + const bool upsert = alwaysUpsert || op.getUpsert().value_or(false); + UpdateRequest request(requestNss); + request.setQuery(updateCriteria); + request.setUpdateModification(o); + request.setUpsert(upsert); + request.setFromOplogApplication(true); Timestamp timestamp; - long long term = OpTime::kUninitializedTerm; if (assignOperationTimestamp) { - if (fieldTs) { - timestamp = fieldTs.timestamp(); - } - if (fieldT) { - term = fieldT.Long(); - } + timestamp = op.getTimestamp(); } - if (!needToDoUpsert) { + const StringData ns = op.getNss().ns(); + auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] { WriteUnitOfWork wuow(opCtx); - - // Do not use supplied timestamps if running through applyOps, as that would allow - // a user to dictate what timestamps appear in the oplog. - if (assignOperationTimestamp) { - if (fieldTs.ok()) { - timestamp = fieldTs.timestamp(); - } - if (fieldT.ok()) { - term = fieldT.Long(); - } + if (timestamp != Timestamp::min()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); } - OpDebug* const nullOpDebug = nullptr; - Status status = collection->insertDocument( - opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true); - - if (status.isOK()) { - wuow.commit(); - } else if (status == ErrorCodes::DuplicateKey) { - // Transactions cannot be retried as upserts once they fail with a duplicate key - // error. - if (inTxn) { - return status; + UpdateResult ur = update(opCtx, db, request); + if (ur.numMatched == 0 && ur.upserted.isEmpty()) { + if (ur.modifiers) { + if (updateCriteria.nFields() == 1) { + // was a simple { _id : ... } update criteria + string msg = str::stream() + << "failed to apply update: " << redact(op.toBSON()); + error() << msg; + return Status(ErrorCodes::UpdateOperationFailed, msg); + } + + // Need to check to see if it isn't present so we can exit early with a + // failure. Note that adds some overhead for this extra check in some cases, + // such as an updateCriteria of the form + // { _id:..., { x : {$size:...} } + // thus this is not ideal. + if (collection == nullptr || + (indexCatalog->haveIdIndex(opCtx) && + Helpers::findById(opCtx, collection, updateCriteria).isNull()) || + // capped collections won't have an _id index + (!indexCatalog->haveIdIndex(opCtx) && + Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) { + string msg = str::stream() + << "couldn't find doc: " << redact(op.toBSON()); + error() << msg; + return Status(ErrorCodes::UpdateOperationFailed, msg); + } + + // Otherwise, it's present; zero objects were updated because of additional + // specifiers in the query for idempotence + } else { + // this could happen benignly on an oplog duplicate replay of an upsert + // (because we are idempotent), if an regular non-mod update fails the item + // is (presumably) missing. + if (!upsert) { + string msg = str::stream() + << "update of non-mod failed: " << redact(op.toBSON()); + error() << msg; + return Status(ErrorCodes::UpdateOperationFailed, msg); + } } - // Continue to the next block to retry the operation as an upsert. - needToDoUpsert = true; - } else { - return status; } - } - // Now see if we need to do an upsert. - if (needToDoUpsert) { - // Do update on DuplicateKey errors. - // This will only be on the _id field in replication, - // since we disable non-_id unique constraint violations. - BSONObjBuilder b; - b.append(o.getField("_id")); - - UpdateRequest request(requestNss); - request.setQuery(b.done()); - request.setUpdateModification(o); - request.setUpsert(); - request.setFromOplogApplication(true); - - const StringData ns = fieldNs.valueStringDataSafe(); - writeConflictRetry(opCtx, "applyOps_upsert", ns, [&] { - WriteUnitOfWork wuow(opCtx); - // If this is an atomic applyOps (i.e: `haveWrappingWriteUnitOfWork` is true), - // do not timestamp the write. - if (assignOperationTimestamp && timestamp != Timestamp::min()) { - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); - } + wuow.commit(); + return Status::OK(); + }); - UpdateResult res = update(opCtx, db, request); - if (res.numMatched == 0 && res.upserted.isEmpty()) { - error() << "No document was updated even though we got a DuplicateKey " - "error when inserting"; - fassertFailedNoTrace(28750); - } - wuow.commit(); - }); + if (!status.isOK()) { + return status; } if (incrementOpsAppliedStats) { incrementOpsAppliedStats(); } + break; } - } else if (*opType == 'u') { - opCounters->gotUpdate(); - if (shouldUseGlobalOpCounters) { - ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate( - opCtx->getWriteConcern()); - } - - auto idField = o2["_id"]; - uassert(ErrorCodes::NoSuchKey, - str::stream() << "Failed to apply update due to missing _id: " << op.toString(), - !idField.eoo()); - - // The o2 field may contain additional fields besides the _id (like the shard key fields), - // but we want to do the update by just _id so we can take advantage of the IDHACK. - BSONObj updateCriteria = idField.wrap(); - const bool upsert = valueB || alwaysUpsert; - - UpdateRequest request(requestNss); - request.setQuery(updateCriteria); - request.setUpdateModification(o); - request.setUpsert(upsert); - request.setFromOplogApplication(true); - - Timestamp timestamp; - if (assignOperationTimestamp) { - timestamp = fieldTs.timestamp(); - } - - const StringData ns = fieldNs.valueStringDataSafe(); - auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] { - WriteUnitOfWork wuow(opCtx); - if (timestamp != Timestamp::min()) { - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); + case OpTypeEnum::kDelete: { + opCounters->gotDelete(); + if (shouldUseGlobalOpCounters) { + ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForDelete( + opCtx->getWriteConcern()); } - UpdateResult ur = update(opCtx, db, request); - if (ur.numMatched == 0 && ur.upserted.isEmpty()) { - if (ur.modifiers) { - if (updateCriteria.nFields() == 1) { - // was a simple { _id : ... } update criteria - string msg = str::stream() << "failed to apply update: " << redact(op); - error() << msg; - return Status(ErrorCodes::UpdateOperationFailed, msg); - } + auto idField = o["_id"]; + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Failed to apply delete due to missing _id: " + << redact(op.toBSON()), + !idField.eoo()); - // Need to check to see if it isn't present so we can exit early with a - // failure. Note that adds some overhead for this extra check in some cases, - // such as an updateCriteria of the form - // { _id:..., { x : {$size:...} } - // thus this is not ideal. - if (collection == nullptr || - (indexCatalog->haveIdIndex(opCtx) && - Helpers::findById(opCtx, collection, updateCriteria).isNull()) || - // capped collections won't have an _id index - (!indexCatalog->haveIdIndex(opCtx) && - Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) { - string msg = str::stream() << "couldn't find doc: " << redact(op); - error() << msg; - return Status(ErrorCodes::UpdateOperationFailed, msg); - } + // The o field may contain additional fields besides the _id (like the shard key + // fields), but we want to do the delete by just _id so we can take advantage of the + // IDHACK. + BSONObj deleteCriteria = idField.wrap(); - // Otherwise, it's present; zero objects were updated because of additional - // specifiers in the query for idempotence - } else { - // this could happen benignly on an oplog duplicate replay of an upsert - // (because we are idempotent), - // if an regular non-mod update fails the item is (presumably) missing. - if (!upsert) { - string msg = str::stream() << "update of non-mod failed: " << redact(op); - error() << msg; - return Status(ErrorCodes::UpdateOperationFailed, msg); - } - } + Timestamp timestamp; + if (assignOperationTimestamp) { + timestamp = op.getTimestamp(); } - wuow.commit(); - return Status::OK(); - }); - - if (!status.isOK()) { - return status; - } - - if (incrementOpsAppliedStats) { - incrementOpsAppliedStats(); - } - } else if (*opType == 'd') { - opCounters->gotDelete(); - if (shouldUseGlobalOpCounters) { - ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForDelete( - opCtx->getWriteConcern()); - } - - auto idField = o["_id"]; - uassert(ErrorCodes::NoSuchKey, - str::stream() << "Failed to apply delete due to missing _id: " << op.toString(), - !idField.eoo()); - - // The o field may contain additional fields besides the _id (like the shard key fields), - // but we want to do the delete by just _id so we can take advantage of the IDHACK. - BSONObj deleteCriteria = idField.wrap(); - - Timestamp timestamp; - if (assignOperationTimestamp) { - timestamp = fieldTs.timestamp(); - } + const StringData ns = op.getNss().ns(); + writeConflictRetry(opCtx, "applyOps_delete", ns, [&] { + WriteUnitOfWork wuow(opCtx); + if (timestamp != Timestamp::min()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); + } + deleteObjects(opCtx, collection, requestNss, deleteCriteria, true /* justOne */); + wuow.commit(); + }); - const StringData ns = fieldNs.valueStringDataSafe(); - writeConflictRetry(opCtx, "applyOps_delete", ns, [&] { - WriteUnitOfWork wuow(opCtx); - if (timestamp != Timestamp::min()) { - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); + if (incrementOpsAppliedStats) { + incrementOpsAppliedStats(); } - - if (opType[1] == 0) { - const auto justOne = true; - deleteObjects(opCtx, collection, requestNss, deleteCriteria, justOne); - } else - verify(opType[1] == 'b'); // "db" advertisement - wuow.commit(); - }); - - if (incrementOpsAppliedStats) { - incrementOpsAppliedStats(); + break; + } + default: { + // Commands are processed in applyCommand_inlock(). + invariant(false, str::stream() << "Unsupported opType " << OpType_serializer(opType)); } - } else { - invariant(*opType != 'c'); // commands are processed in applyCommand_inlock() - uasserted(14825, str::stream() << "error in applyOperation : unknown opType " << *opType); } return Status::OK(); } Status applyCommand_inlock(OperationContext* opCtx, - const BSONObj& op, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery) { // We should only have a stableTimestampForRecovery during replication recovery. invariant(stableTimestampForRecovery == boost::none || mode == OplogApplication::Mode::kRecovering); - LOG(3) << "applying command op: " << redact(op) + LOG(3) << "applying command op: " << redact(entry.toBSON()) << ", oplog application mode: " << OplogApplication::modeToString(mode) << ", stable timestamp for recovery: " << stableTimestampForRecovery; - std::array names = {"o", "ui", "ns", "op"}; - std::array fields; - op.getFields(names, &fields); - BSONElement& fieldO = fields[0]; - BSONElement& fieldUI = fields[1]; - BSONElement& fieldNs = fields[2]; - BSONElement& fieldOp = fields[3]; - - const char* opType = fieldOp.valuestrsafe(); - invariant(*opType == 'c'); // only commands are processed here + // Only commands are processed here. + invariant(entry.getOpType() == OpTypeEnum::kCommand); // Choose opCounters based on running on standalone/primary or secondary by checking // whether writes are replicated. OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters; opCounters->gotCommand(); - if (fieldO.eoo()) { - return Status(ErrorCodes::NoSuchKey, "Missing expected field 'o'"); - } - - if (!fieldO.isABSONObj()) { - return Status(ErrorCodes::BadValue, "Expected object for field 'o'"); - } + BSONObj o = entry.getObject(); - BSONObj o = fieldO.embeddedObject(); - - uassert(ErrorCodes::InvalidNamespace, - "'ns' must be of type String", - fieldNs.type() == BSONType::String); - const NamespaceString nss(fieldNs.valueStringData()); + const auto& nss = entry.getNss(); if (!nss.isValid()) { return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())}; } @@ -1719,21 +1627,18 @@ Status applyCommand_inlock(OperationContext* opCtx, if ((mode == OplogApplication::Mode::kInitialSync) && (std::find(whitelistedOps.begin(), whitelistedOps.end(), o.firstElementFieldName()) == whitelistedOps.end()) && - parseNs(nss.ns(), o) == NamespaceString::kServerConfigurationNamespace) { + extractNs(nss, o) == NamespaceString::kServerConfigurationNamespace) { return Status(ErrorCodes::OplogOperationUnsupported, str::stream() << "Applying command to feature compatibility version " "collection not supported in initial sync: " - << redact(op)); + << redact(entry.toBSON())); } // Parse optime from oplog entry unless we are applying this command in standalone or on a // primary (replicated writes enabled). OpTime opTime; if (!opCtx->writesAreReplicated()) { - auto opTimeResult = OpTime::parseFromOplogEntry(op); - if (opTimeResult.isOK()) { - opTime = opTimeResult.getValue(); - } + opTime = entry.getOpTime(); } const bool assignCommandTimestamp = [&] { @@ -1769,7 +1674,8 @@ Status applyCommand_inlock(OperationContext* opCtx, MONGO_UNREACHABLE; }(); invariant(!assignCommandTimestamp || !opTime.isNull(), - str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op)); + str::stream() << "Oplog entry did not have 'ts' field when expected: " + << redact(entry.toBSON())); const Timestamp writeTime = (assignCommandTimestamp ? opTime.getTimestamp() : Timestamp()); @@ -1789,14 +1695,7 @@ Status applyCommand_inlock(OperationContext* opCtx, // If 'writeTime' is not null, any writes in this scope will be given 'writeTime' as // their timestamp at commit. TimestampBlock tsBlock(opCtx, writeTime); - return curOpToApply.applyFunc(opCtx, - nss.ns().c_str(), - fieldUI, - o, - opTime, - entry, - mode, - stableTimestampForRecovery); + return curOpToApply.applyFunc(opCtx, entry, mode, stableTimestampForRecovery); } catch (const DBException& ex) { return ex.toStatus(); } @@ -1826,14 +1725,14 @@ Status applyCommand_inlock(OperationContext* opCtx, // TODO: This parse could be expensive and not worth it. auto ns = cmd->parse(opCtx, OpMsgRequest::fromDBAndBody(nss.db(), o))->ns().toString(); - auto swUUID = UUID::parse(fieldUI); - if (!swUUID.isOK()) { - error() << "Failed command " << redact(o) << " on " << ns << " with status " - << swUUID.getStatus() << "during oplog application. Expected a UUID."; + auto swUUID = entry.getUuid(); + if (!swUUID) { + error() << "Failed command " << redact(o) << " on " << ns + << "during oplog application. Expected a UUID."; } BackgroundOperation::awaitNoBgOpInProgForNs(ns); IndexBuildsCoordinator::get(opCtx)->awaitNoIndexBuildInProgressForCollection( - swUUID.getValue()); + swUUID.get()); opCtx->recoveryUnit()->abandonSnapshot(); opCtx->checkForInterrupt(); @@ -1853,7 +1752,7 @@ Status applyCommand_inlock(OperationContext* opCtx, } } - AuthorizationManager::get(opCtx->getServiceContext())->logOp(opCtx, opType, nss, o, nullptr); + AuthorizationManager::get(opCtx->getServiceContext())->logOp(opCtx, "c", nss, o, nullptr); return Status::OK(); } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 7bf42539624..34da8931085 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -39,6 +39,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_entry_batch.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" @@ -192,8 +193,8 @@ inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) { } /** - * Take a non-command op and apply it locally - * Used for applying from an oplog + * Used for applying from an oplog entry or grouped inserts. + * @param opOrGroupedInserts a single oplog entry or grouped inserts to be applied. * @param alwaysUpsert convert some updates to upserts for idempotency reasons * @param mode specifies what oplog application mode we are in * @param incrementOpsAppliedStats is called whenever an op is applied. @@ -201,7 +202,7 @@ inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) { */ Status applyOperation_inlock(OperationContext* opCtx, Database* db, - const BSONObj& op, + const OplogEntryBatch& opOrGroupedInserts, bool alwaysUpsert, OplogApplication::Mode mode, IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {}); @@ -212,7 +213,6 @@ Status applyOperation_inlock(OperationContext* opCtx, * Returns failure status if the op that could not be applied. */ Status applyCommand_inlock(OperationContext* opCtx, - const BSONObj& op, const OplogEntry& entry, OplogApplication::Mode mode, boost::optional stableTimestampForRecovery); diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 24c47432508..f1379307027 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -170,7 +170,7 @@ public: using MutableOplogEntry::kVersionFieldName; using MutableOplogEntry::kWallClockTimeFieldName; - // Make serialize(), toBSON() and getters accessible. + // Make serialize() and getters accessible. using MutableOplogEntry::get_id; using MutableOplogEntry::getDurableReplOperation; using MutableOplogEntry::getFromMigrate; @@ -193,7 +193,6 @@ public: using MutableOplogEntry::getVersion; using MutableOplogEntry::getWallClockTime; using MutableOplogEntry::serialize; - using MutableOplogEntry::toBSON; // Make helper functions accessible. using MutableOplogEntry::getOpTime; @@ -323,6 +322,10 @@ public: */ std::string toString() const; + BSONObj toBSON() const { + return _raw; + } + private: BSONObj _raw; // Owned. CommandType _commandType = CommandType::kNotCommand; diff --git a/src/mongo/db/repl/oplog_entry_batch.cpp b/src/mongo/db/repl/oplog_entry_batch.cpp new file mode 100644 index 00000000000..ff079e8f2b0 --- /dev/null +++ b/src/mongo/db/repl/oplog_entry_batch.cpp @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/oplog_entry_batch.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { +namespace repl { +BSONObj OplogEntryBatch::toBSON() const { + if (!isGroupedInserts()) + return getOp().toBSON(); + + // Since we found more than one document, create grouped insert of many docs. + // We are going to group many 'i' ops into one big 'i' op, with array fields for + // 'ts', 't', and 'o', corresponding to each individual op. + // For example: + // { ts: Timestamp(1,1), t:1, ns: "test.foo", op:"i", o: {_id:1} } + // { ts: Timestamp(1,2), t:1, ns: "test.foo", op:"i", o: {_id:2} } + // become: + // { ts: [Timestamp(1, 1), Timestamp(1, 2)], + // t: [1, 1], + // o: [{_id: 1}, {_id: 2}], + // ns: "test.foo", + // op: "i" + // } + // This BSONObj is used for error messages logging only. + BSONObjBuilder groupedInsertBuilder; + // Populate the "ts" field with an array of all the grouped inserts' timestamps. + { + BSONArrayBuilder tsArrayBuilder(groupedInsertBuilder.subarrayStart("ts")); + for (auto op : _batch) { + tsArrayBuilder.append(op->getTimestamp()); + } + } + // Populate the "t" (term) field with an array of all the grouped inserts' terms. + { + BSONArrayBuilder tArrayBuilder(groupedInsertBuilder.subarrayStart("t")); + for (auto op : _batch) { + long long term = OpTime::kUninitializedTerm; + auto parsedTerm = op->getTerm(); + if (parsedTerm) + term = parsedTerm.get(); + tArrayBuilder.append(term); + } + } + // Populate the "o" field with an array of all the grouped inserts. + { + BSONArrayBuilder oArrayBuilder(groupedInsertBuilder.subarrayStart("o")); + for (auto op : _batch) { + oArrayBuilder.append(op->getObject()); + } + } + // Generate an op object of all elements except for "ts", "t", and "o", since we + // need to make those fields arrays of all the ts's, t's, and o's. + groupedInsertBuilder.appendElementsUnique(getOp().toBSON()); + return groupedInsertBuilder.obj(); +} +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry_batch.h b/src/mongo/db/repl/oplog_entry_batch.h new file mode 100644 index 00000000000..444087c0a45 --- /dev/null +++ b/src/mongo/db/repl/oplog_entry_batch.h @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/repl/oplog_entry.h" + +namespace mongo { +namespace repl { +/** + * This is a class for a single oplog entry or grouped inserts to be applied in syncApply. This + * class is immutable and can only be initialized using either a single oplog entry or a range of + * grouped inserts. + */ +class OplogEntryBatch { +public: + using OperationPtrs = std::vector; + using ConstIterator = OperationPtrs::const_iterator; + + OplogEntryBatch() = delete; + + // This initializes it as a single oplog entry. + OplogEntryBatch(const OplogEntry* op) : _batch({op}) {} + + // This initializes it as grouped inserts. + OplogEntryBatch(ConstIterator begin, ConstIterator end) : _batch(begin, end) { + // Performs sanity checks to confirm that the batch is valid. + invariant(!_batch.empty()); + for (auto op : _batch) { + // Every oplog entry must be an insert. + invariant(op->getOpType() == OpTypeEnum::kInsert); + // Every oplog entry must be in the same namespace. + invariant(op->getNss() == _batch.front()->getNss()); + } + } + + // Return the oplog entry to be applied or the first oplog entry of the grouped inserts. + const OplogEntry& getOp() const { + return *(_batch.front()); + } + + bool isGroupedInserts() const { + return _batch.size() > 1; + } + + const OperationPtrs& getGroupedInserts() const { + invariant(isGroupedInserts()); + return _batch; + } + + // Returns a BSONObj for message logging purpose. + BSONObj toBSON() const; + +private: + // A single oplog entry or a batch of grouped insert oplog entries to be applied. + OperationPtrs _batch; +}; +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ef05c77289a..8acfbc9d001 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -226,9 +226,9 @@ NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEn return *nss; } -NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op) { - if (auto ui = op["ui"]) { - return {nss.db().toString(), uassertStatusOK(UUID::parse(ui))}; +NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const OplogEntry& op) { + if (auto ui = op.getUuid()) { + return {nss.db().toString(), ui.get()}; } return nss; } @@ -240,8 +240,7 @@ NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op) Status finishAndLogApply(ClockSource* clockSource, Status finalStatus, Date_t applyStartTime, - OpTypeEnum opType, - const BSONObj& op) { + const OplogEntryBatch& batch) { if (finalStatus.isOK()) { auto applyEndTime = clockSource->now(); @@ -253,13 +252,13 @@ Status finishAndLogApply(ClockSource* clockSource, StringBuilder s; s << "applied op: "; - if (opType == OpTypeEnum::kCommand) { + if (batch.getOp().getOpType() == OpTypeEnum::kCommand) { s << "command "; } else { s << "CRUD "; } - s << redact(op); + s << redact(batch.toBSON()); s << ", took " << diffMS << "ms"; log() << s.str(); @@ -276,13 +275,14 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod // static Status SyncTail::syncApply(OperationContext* opCtx, - const BSONObj& op, + const OplogEntryBatch& batch, OplogApplication::Mode oplogApplicationMode, boost::optional stableTimestampForRecovery) { + auto op = batch.getOp(); // Count each log op application as a separate operation, for reporting purposes CurOp individualOp(opCtx); - const NamespaceString nss(op.getStringField("ns")); + const NamespaceString nss(op.getNss()); auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); }; @@ -302,7 +302,7 @@ Status SyncTail::syncApply(OperationContext* opCtx, // mode (similar to initial sync) instead so we do not accidentally ignore real errors. bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync); Status status = applyOperation_inlock( - opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); + opCtx, db, batch, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } @@ -318,10 +318,10 @@ Status SyncTail::syncApply(OperationContext* opCtx, MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterRecordingOpApplicationStartTime); } - auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe()); + auto opType = op.getOpType(); auto finishApply = [&](Status status) { - return finishAndLogApply(clockSource, status, applyStartTime, opType, op); + return finishAndLogApply(clockSource, status, applyStartTime, batch); }; if (opType == OpTypeEnum::kNoop) { @@ -353,19 +353,16 @@ Status SyncTail::syncApply(OperationContext* opCtx, return Status::OK(); } - ex.addContext(str::stream() << "Failed to apply operation: " << redact(op)); + ex.addContext(str::stream() + << "Failed to apply operation: " << redact(batch.toBSON())); throw; } })); } else if (opType == OpTypeEnum::kCommand) { return finishApply(writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] { - // TODO SERVER-37180 Remove this double-parsing. - // The command entry has been parsed before, so it must be valid. - auto entry = uassertStatusOK(OplogEntry::parse(op)); - // A special case apply for commands to avoid implicit database creation. - Status status = applyCommand_inlock( - opCtx, op, entry, oplogApplicationMode, stableTimestampForRecovery); + Status status = + applyCommand_inlock(opCtx, op, oplogApplicationMode, stableTimestampForRecovery); incrementOpsAppliedStats(); return status; })); @@ -647,7 +644,7 @@ private: auto oplogEntries = fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits)); for (const auto& oplogEntry : oplogEntries) { - ops.emplace_back(oplogEntry.getRaw()); + ops.emplace_back(oplogEntry); } // If we don't have anything in the queue, wait a bit for something to appear. @@ -1052,7 +1049,7 @@ Status multiSyncApply(OperationContext* opCtx, try { auto stableTimestampForRecovery = st->getOptions().stableTimestampForRecovery; const Status status = SyncTail::syncApply( - opCtx, entry.getRaw(), oplogApplicationMode, stableTimestampForRecovery); + opCtx, &entry, oplogApplicationMode, stableTimestampForRecovery); if (!status.isOK()) { // In initial sync, update operations can cause documents to be missed during diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 2bbdfbd71c8..6a156a630aa 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -73,15 +73,14 @@ public: WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; /** - * Applies the operation that is in param o. + * Applies the operations that is in param ops. * Functions for applying operations/commands and increment server status counters may * be overridden for testing. */ static Status syncApply(OperationContext* opCtx, - const BSONObj& o, + const OplogEntryBatch& batch, OplogApplication::Mode oplogApplicationMode, boost::optional stableTimestampForRecovery); - /** * * Constructs a SyncTail. @@ -151,10 +150,10 @@ public: return _batch; } - void emplace_back(BSONObj obj) { + void emplace_back(OplogEntry oplog) { invariant(!_mustShutdown); - _bytes += obj.objsize(); - _batch.emplace_back(std::move(obj)); + _bytes += oplog.getRawObjSizeBytes(); + _batch.emplace_back(std::move(oplog)); } void pop_back() { _bytes -= back().getRawObjSizeBytes(); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 933f23f4667..79fbd315d3b 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -234,44 +234,18 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) { return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long()); }; -TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) { - const BSONObj op = BSON("op" - << "x"); - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none), - ExceptionFor); -} - -TEST_F(SyncTailTest, SyncApplyNoNamespaceNoOp) { - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), - BSON("op" - << "n"), - OplogApplication::Mode::kInitialSync, - boost::none)); -} - -TEST_F(SyncTailTest, SyncApplyBadOp) { - const BSONObj op = BSON("op" - << "x" - << "ns" - << "test.t"); - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none), - ExceptionFor); -} - TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) { NamespaceString nss("test.t"); auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - ASSERT_THROWS(SyncTail::syncApply( - _opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary, boost::none), - ExceptionFor); + ASSERT_THROWS( + SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none), + ExceptionFor); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentDatabaseMissing) { NamespaceString otherNss("test.othername"); auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) { @@ -279,9 +253,9 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) { createDatabase(_opCtx.get(), nss.db()); NamespaceString otherNss(nss.getSisterNS("othername")); auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, kUuid); - ASSERT_THROWS(SyncTail::syncApply( - _opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary, boost::none), - ExceptionFor); + ASSERT_THROWS( + SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none), + ExceptionFor); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) { @@ -289,7 +263,7 @@ TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) { createDatabase(_opCtx.get(), nss.db()); NamespaceString otherNss(nss.getSisterNS("othername")); auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, kUuid); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) { @@ -299,9 +273,9 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) { // which in the case of this test just ignores such errors. This tests mostly that we don't // implicitly create the collection and lock the database in MODE_X. auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - ASSERT_THROWS(SyncTail::syncApply( - _opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary, boost::none), - ExceptionFor); + ASSERT_THROWS( + SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none), + ExceptionFor); ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); } @@ -312,7 +286,7 @@ TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionMissing) { // which in the case of this test just ignores such errors. This tests mostly that we don't // implicitly create the collection and lock the database in MODE_X. auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); } @@ -320,14 +294,14 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionExists) { const NamespaceString nss("test.t"); createCollection(_opCtx.get(), nss, {}); auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), true); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionExists) { const NamespaceString nss("test.t"); createCollection(_opCtx.get(), nss, {}); auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {}); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); } TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLockedByUUID) { @@ -336,7 +310,7 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLockedByUUID) { // Test that the collection to lock is determined by the UUID and not the 'ns' field. NamespaceString otherNss(nss.getSisterNS("othername")); auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, uuid); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), true); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, true); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLockedByUUID) { @@ -348,7 +322,7 @@ TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLockedByUUID) { // Test that the collection to lock is determined by the UUID and not the 'ns' field. NamespaceString otherNss(nss.getSisterNS("othername")); auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, options.uuid); - _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false); + _testSyncApplyCrudOperation(ErrorCodes::OK, op, false); } TEST_F(SyncTailTest, SyncApplyCommand) { @@ -373,24 +347,12 @@ TEST_F(SyncTailTest, SyncApplyCommand) { }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_OK( - SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none)); + auto entry = OplogEntry(op); + ASSERT_OK(SyncTail::syncApply( + _opCtx.get(), &entry, OplogApplication::Mode::kInitialSync, boost::none)); ASSERT_TRUE(applyCmdCalled); } -TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { - const BSONObj op = BSON("op" - << "c" - << "ns" << 12345 << "o" - << BSON("create" - << "t") - << "ts" << Timestamp(1, 1)); - // This test relies on the namespace type check of IDL. - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none), - ExceptionFor); -} - DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail(nullptr, @@ -2364,13 +2326,13 @@ TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) { auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); startCapturingLogMessages(); - ASSERT_OK(SyncTail::syncApply( - _opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary, boost::none)); + ASSERT_OK( + SyncTail::syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary, boost::none)); // Use a builder for easier escaping. We expect the operation to be logged. StringBuilder expected; - expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), " - "t: 1, v: 2 }, took " + expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, v: 2, op: \"i\", ns: \"test.t\", o: " + "{ _id: 0 } }, took " << applyDuration << "ms"; ASSERT_EQUALS(1, countLogLinesContaining(expected.str())); } @@ -2387,8 +2349,7 @@ TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) { startCapturingLogMessages(); ASSERT_THROWS( - SyncTail::syncApply( - _opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary, boost::none), + SyncTail::syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary, boost::none), ExceptionFor); // Use a builder for easier escaping. We expect the operation to *not* be logged @@ -2412,8 +2373,8 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) { auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); startCapturingLogMessages(); - ASSERT_OK(SyncTail::syncApply( - _opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary, boost::none)); + ASSERT_OK( + SyncTail::syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary, boost::none)); // Use a builder for easier escaping. We expect the operation to *not* be logged, // since it wasn't slow to apply. diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 6ebcfa0f6fd..59860e000c2 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -154,7 +154,7 @@ StorageInterface* SyncTailTest::getStorageInterface() const { } void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, - const BSONObj& op, + const OplogEntry& op, bool expectedApplyOpCalled) { bool applyOpCalled = false; @@ -174,7 +174,7 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, checkOpCtx(opCtx); ASSERT_EQUALS(NamespaceString("test.t"), nss); ASSERT_EQUALS(1U, docs.size()); - ASSERT_BSONOBJ_EQ(op["o"].Obj(), docs[0]); + ASSERT_BSONOBJ_EQ(op.getObject(), docs[0]); return Status::OK(); }; @@ -188,13 +188,13 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, checkOpCtx(opCtx); ASSERT_EQUALS(NamespaceString("test.t"), nss); ASSERT(deletedDoc); - ASSERT_BSONOBJ_EQ(op["o"].Obj(), *deletedDoc); + ASSERT_BSONOBJ_EQ(op.getObject(), *deletedDoc); return Status::OK(); }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); ASSERT_EQ( - SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kSecondary, boost::none), + SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none), expectedError); ASSERT_EQ(applyOpCalled, expectedApplyOpCalled); } diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index 9b1adaea94f..6ce29c072f8 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -115,7 +115,7 @@ public: protected: void _testSyncApplyCrudOperation(ErrorCodes::Error expectedError, - const BSONObj& op, + const OplogEntry& op, bool expectedApplyOpCalled); ServiceContext::UniqueOperationContext _opCtx; diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index c23415f61e4..fe2ee9fbc18 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -66,7 +66,7 @@ Status _applyOperationsForTransaction(OperationContext* opCtx, try { AutoGetCollection coll(opCtx, op.getNss(), MODE_IX); auto status = repl::applyOperation_inlock( - opCtx, coll.getDb(), op.toBSON(), false /*alwaysUpsert*/, oplogApplicationMode); + opCtx, coll.getDb(), &op, false /*alwaysUpsert*/, oplogApplicationMode); if (!status.isOK()) { return status; } diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 8a7edd7d7b2..95d016a523d 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -249,8 +249,9 @@ protected: mongo::unittest::log() << "op: " << *i << endl; } repl::UnreplicatedWritesBlock uwb(&_opCtx); + auto entry = uassertStatusOK(OplogEntry::parse(*i)); uassertStatusOK(applyOperation_inlock( - &_opCtx, ctx.db(), *i, false, OplogApplication::Mode::kSecondary)); + &_opCtx, ctx.db(), &entry, false, OplogApplication::Mode::kSecondary)); } } } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index a42955f4646..60d10e576f3 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -745,36 +745,34 @@ public: const std::int32_t docsToInsert = 10; const LogicalTime firstInsertTime = _clock->reserveTicks(docsToInsert); - BSONObjBuilder oplogEntryBuilder; - - // Populate the "ts" field with an array of all the grouped inserts' timestamps. - BSONArrayBuilder tsArrayBuilder(oplogEntryBuilder.subarrayStart("ts")); - for (std::int32_t idx = 0; idx < docsToInsert; ++idx) { - tsArrayBuilder.append(firstInsertTime.addTicks(idx).asTimestamp()); - } - tsArrayBuilder.done(); - - // Populate the "t" (term) field with an array of all the grouped inserts' terms. - BSONArrayBuilder tArrayBuilder(oplogEntryBuilder.subarrayStart("t")); - for (std::int32_t idx = 0; idx < docsToInsert; ++idx) { - tArrayBuilder.append(1LL); - } - tArrayBuilder.done(); - - // Populate the "o" field with an array of all the grouped inserts. - BSONArrayBuilder oArrayBuilder(oplogEntryBuilder.subarrayStart("o")); + BSONObjBuilder oplogCommonBuilder; + oplogCommonBuilder << "v" << 2 << "op" + << "i" + << "ns" << nss.ns() << "ui" << autoColl.getCollection()->uuid(); + auto oplogCommon = oplogCommonBuilder.done(); + + std::vector oplogEntries; + oplogEntries.reserve(docsToInsert); + std::vector opPtrs; + BSONObjBuilder oplogEntryBuilders[docsToInsert]; for (std::int32_t idx = 0; idx < docsToInsert; ++idx) { - oArrayBuilder.append(BSON("_id" << idx)); - } - oArrayBuilder.done(); - - oplogEntryBuilder << "v" << 2 << "op" - << "i" - << "ns" << nss.ns() << "ui" << autoColl.getCollection()->uuid(); - - auto oplogEntry = oplogEntryBuilder.done(); + auto o = BSON("_id" << idx); + // Populate the "ts" field. + oplogEntryBuilders[idx] << "ts" << firstInsertTime.addTicks(idx).asTimestamp(); + // Populate the "t" (term) field. + oplogEntryBuilders[idx] << "t" << 1LL; + // Populate the "o" field. + oplogEntryBuilders[idx] << "o" << o; + // Populate the other common fields. + oplogEntryBuilders[idx].appendElementsUnique(oplogCommon); + // Insert ops to be applied. + oplogEntries.push_back(repl::OplogEntry(oplogEntryBuilders[idx].done())); + opPtrs.push_back(&(oplogEntries.back())); + } + + repl::OplogEntryBatch groupedInsertBatch(opPtrs.cbegin(), opPtrs.cend()); ASSERT_OK(repl::SyncTail::syncApply( - _opCtx, oplogEntry, repl::OplogApplication::Mode::kSecondary, boost::none)); + _opCtx, groupedInsertBatch, repl::OplogApplication::Mode::kSecondary, boost::none)); for (std::int32_t idx = 0; idx < docsToInsert; ++idx) { OneOffRead oor(_opCtx, firstInsertTime.addTicks(idx).asTimestamp()); -- cgit v1.2.1