diff options
author | jannaerin <golden.janna@gmail.com> | 2020-12-14 23:21:50 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-16 17:36:05 +0000 |
commit | 704e695b7296e52bf47d999b8250a14809752051 (patch) | |
tree | 20cf5efeb80da7a4e67b28170443f4a683e47d1c /src/mongo/db | |
parent | 8244620f192af3292bf99f3f71e7f9a589a6bce4 (diff) | |
download | mongo-704e695b7296e52bf47d999b8250a14809752051.tar.gz |
SERVER-49909 Error if resharding's oplog application sees an unexpected oplog entry
Diffstat (limited to 'src/mongo/db')
4 files changed, 141 insertions, 4 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index bd256be2863..2752759ddc9 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -44,6 +44,7 @@ #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" +#include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/counters.h" @@ -119,7 +120,7 @@ Status ReshardingOplogApplicationRules::applyOperation( invariant(opCtx->writesAreReplicated()); auto op = opOrGroupedInserts.getOp(); - return writeConflictRetry(opCtx, "applyOplogEntryResharding", op.getNss().ns(), [&] { + return writeConflictRetry(opCtx, "applyOplogEntryCRUDOpResharding", op.getNss().ns(), [&] { try { WriteUnitOfWork wuow(opCtx); @@ -183,6 +184,49 @@ Status ReshardingOplogApplicationRules::applyOperation( }); } +Status ReshardingOplogApplicationRules::ReshardingOplogApplicationRules::applyCommand( + OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) { + LOGV2_DEBUG(49909, + 3, + "Applying command op for resharding", + "opl"_attr = redact(opOrGroupedInserts.toBSON())); + + auto op = opOrGroupedInserts.getOp(); + + invariant(op.getOpType() == repl::OpTypeEnum::kCommand); + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + invariant(opCtx->writesAreReplicated()); + + return writeConflictRetry(opCtx, "applyOplogEntryCommandOpResharding", op.getNss().ns(), [&] { + OpCounters* opCounters = &globalOpCounters; + opCounters->gotCommand(); + + invariant(op.getNss() == _outputNss); + BSONObj oField = op.getObject(); + + // Only applyOps, commitTransaction, and abortTransaction are allowed. + std::vector<std::string> supportedCmds{"applyOps", "commitTransaction", "abortTransaction"}; + if (std::find(supportedCmds.begin(), supportedCmds.end(), oField.firstElementFieldName()) == + supportedCmds.end()) { + if (oField.firstElementFieldName() == "drop"_sd) { + return Status(ErrorCodes::OplogOperationUnsupported, + str::stream() + << "Received drop command for resharding source collection " + << redact(op.toBSON())); + } + + return Status(ErrorCodes::OplogOperationUnsupported, + str::stream() << "Command not supported during resharding: " + << redact(op.toBSON())); + } + + // TODO SERVER-49907 implement applyOps write rule + // TODO SERVER-49905 handle commit and abort transaction rules + return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( + opCtx, opOrGroupedInserts, repl::OplogApplication::Mode::kInitialSync, [] {}, nullptr); + }); +} + void ReshardingOplogApplicationRules::_applyInsert_inlock( OperationContext* opCtx, Database* db, diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h index 2a27f79ba3e..6b5730830fb 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.h +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -68,6 +68,13 @@ public: Status applyOperation(OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + /** + * Wraps the command application in a writeConflictRetry loop. Will return an error on any + * command other than "applyOps", "commitTransaction", or "abortTransaction". + */ + Status applyCommand(OperationContext* opCtx, + const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + private: // Applies an insert operation void _applyInsert_inlock(OperationContext* opCtx, diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 6638dd4b697..a7d25a3ea5a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -536,9 +536,12 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts( auto opType = op.getOpType(); if (opType == repl::OpTypeEnum::kNoop) { return Status::OK(); - } else if (resharding::gUseReshardingOplogApplicationRules && - repl::OplogEntry::isCrudOpType(opType)) { - return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts); + } else if (resharding::gUseReshardingOplogApplicationRules) { + if (repl::OplogEntry::isCrudOpType(opType)) { + return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts); + } else if (opType == repl::OpTypeEnum::kCommand) { + return _applicationRules.applyCommand(opCtx, entryOrGroupedInserts); + } } else { // We always use oplog application mode 'kInitialSync', because we're applying oplog entries // to a cloned database the way initial sync does. diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index cc45cc10926..2c16798e975 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -1619,6 +1619,89 @@ TEST_F(ReshardingOplogApplierTest, UpdateOutputCollUseReshardingApplicationRules ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs()); } +TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldErrorUseReshardingApplicationRules) { + setReshardingOplogApplicationServerParameterTrue(); + + std::queue<repl::OplogEntry> ops; + ops.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none)); + ops.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kCommand, + BSON("renameCollection" << appliedToNs().ns() << "to" << stashNs().ns()), + boost::none)); + ops.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + stashCollections(), + Timestamp(5, 3), + std::move(iterator), + 1 /* batchSize */, + chunkManager(), + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + future = applier.applyUntilDone(); + + ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); + ASSERT_BSONOBJ_EQ(BSONObj(), doc); + + auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); + ASSERT_TRUE(progressDoc); + ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getClusterTime()); + ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getTs()); +} + +TEST_F(ReshardingOplogApplierTest, + DropSourceCollectionCmdShouldErrorUseReshardingApplicationRules) { + setReshardingOplogApplicationServerParameterTrue(); + + std::queue<repl::OplogEntry> ops; + ops.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kCommand, + BSON("drop" << appliedToNs().ns()), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + stashCollections(), + Timestamp(5, 3), + std::move(iterator), + 1 /* batchSize */, + chunkManager(), + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + + ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported); + + auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); + ASSERT_FALSE(progressDoc); +} + class ReshardingOplogApplierRetryableTest : public ReshardingOplogApplierTest { public: void setUp() override { |