summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-12-14 23:21:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-16 17:36:05 +0000
commit704e695b7296e52bf47d999b8250a14809752051 (patch)
tree20cf5efeb80da7a4e67b28170443f4a683e47d1c /src/mongo/db
parent8244620f192af3292bf99f3f71e7f9a589a6bce4 (diff)
downloadmongo-704e695b7296e52bf47d999b8250a14809752051.tar.gz
SERVER-49909 Error if resharding's oplog application sees an unexpected oplog entry
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp46
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp9
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp83
4 files changed, 141 insertions, 4 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index bd256be2863..2752759ddc9 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/ops/update.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
+#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/counters.h"
@@ -119,7 +120,7 @@ Status ReshardingOplogApplicationRules::applyOperation(
invariant(opCtx->writesAreReplicated());
auto op = opOrGroupedInserts.getOp();
- return writeConflictRetry(opCtx, "applyOplogEntryResharding", op.getNss().ns(), [&] {
+ return writeConflictRetry(opCtx, "applyOplogEntryCRUDOpResharding", op.getNss().ns(), [&] {
try {
WriteUnitOfWork wuow(opCtx);
@@ -183,6 +184,49 @@ Status ReshardingOplogApplicationRules::applyOperation(
});
}
+Status ReshardingOplogApplicationRules::ReshardingOplogApplicationRules::applyCommand(
+ OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) {
+ LOGV2_DEBUG(49909,
+ 3,
+ "Applying command op for resharding",
+ "opl"_attr = redact(opOrGroupedInserts.toBSON()));
+
+ auto op = opOrGroupedInserts.getOp();
+
+ invariant(op.getOpType() == repl::OpTypeEnum::kCommand);
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ invariant(opCtx->writesAreReplicated());
+
+ return writeConflictRetry(opCtx, "applyOplogEntryCommandOpResharding", op.getNss().ns(), [&] {
+ OpCounters* opCounters = &globalOpCounters;
+ opCounters->gotCommand();
+
+ invariant(op.getNss() == _outputNss);
+ BSONObj oField = op.getObject();
+
+ // Only applyOps, commitTransaction, and abortTransaction are allowed.
+ std::vector<std::string> supportedCmds{"applyOps", "commitTransaction", "abortTransaction"};
+ if (std::find(supportedCmds.begin(), supportedCmds.end(), oField.firstElementFieldName()) ==
+ supportedCmds.end()) {
+ if (oField.firstElementFieldName() == "drop"_sd) {
+ return Status(ErrorCodes::OplogOperationUnsupported,
+ str::stream()
+ << "Received drop command for resharding source collection "
+ << redact(op.toBSON()));
+ }
+
+ return Status(ErrorCodes::OplogOperationUnsupported,
+ str::stream() << "Command not supported during resharding: "
+ << redact(op.toBSON()));
+ }
+
+ // TODO SERVER-49907 implement applyOps write rule
+ // TODO SERVER-49905 handle commit and abort transaction rules
+ return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
+ opCtx, opOrGroupedInserts, repl::OplogApplication::Mode::kInitialSync, [] {}, nullptr);
+ });
+}
+
void ReshardingOplogApplicationRules::_applyInsert_inlock(
OperationContext* opCtx,
Database* db,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
index 2a27f79ba3e..6b5730830fb 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -68,6 +68,13 @@ public:
Status applyOperation(OperationContext* opCtx,
const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+ /**
+ * Wraps the command application in a writeConflictRetry loop. Will return an error on any
+ * command other than "applyOps", "commitTransaction", or "abortTransaction".
+ */
+ Status applyCommand(OperationContext* opCtx,
+ const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+
private:
// Applies an insert operation
void _applyInsert_inlock(OperationContext* opCtx,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 6638dd4b697..a7d25a3ea5a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -536,9 +536,12 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts(
auto opType = op.getOpType();
if (opType == repl::OpTypeEnum::kNoop) {
return Status::OK();
- } else if (resharding::gUseReshardingOplogApplicationRules &&
- repl::OplogEntry::isCrudOpType(opType)) {
- return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts);
+ } else if (resharding::gUseReshardingOplogApplicationRules) {
+ if (repl::OplogEntry::isCrudOpType(opType)) {
+ return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts);
+ } else if (opType == repl::OpTypeEnum::kCommand) {
+ return _applicationRules.applyCommand(opCtx, entryOrGroupedInserts);
+ }
} else {
// We always use oplog application mode 'kInitialSync', because we're applying oplog entries
// to a cloned database the way initial sync does.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index cc45cc10926..2c16798e975 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -1619,6 +1619,89 @@ TEST_F(ReshardingOplogApplierTest, UpdateOutputCollUseReshardingApplicationRules
ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs());
}
+TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldErrorUseReshardingApplicationRules) {
+ setReshardingOplogApplicationServerParameterTrue();
+
+ std::queue<repl::OplogEntry> ops;
+ ops.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none));
+ ops.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kCommand,
+ BSON("renameCollection" << appliedToNs().ns() << "to" << stashNs().ns()),
+ boost::none));
+ ops.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ stashCollections(),
+ Timestamp(5, 3),
+ std::move(iterator),
+ 1 /* batchSize */,
+ chunkManager(),
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ future = applier.applyUntilDone();
+
+ ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
+ ASSERT_BSONOBJ_EQ(BSONObj(), doc);
+
+ auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
+ ASSERT_TRUE(progressDoc);
+ ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getClusterTime());
+ ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getTs());
+}
+
+TEST_F(ReshardingOplogApplierTest,
+ DropSourceCollectionCmdShouldErrorUseReshardingApplicationRules) {
+ setReshardingOplogApplicationServerParameterTrue();
+
+ std::queue<repl::OplogEntry> ops;
+ ops.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kCommand,
+ BSON("drop" << appliedToNs().ns()),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ stashCollections(),
+ Timestamp(5, 3),
+ std::move(iterator),
+ 1 /* batchSize */,
+ chunkManager(),
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+
+ ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported);
+
+ auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
+ ASSERT_FALSE(progressDoc);
+}
+
class ReshardingOplogApplierRetryableTest : public ReshardingOplogApplierTest {
public:
void setUp() override {