diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-01-09 11:47:35 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-09 12:53:37 +0000 |
commit | 54e87285f802a49456c02c80cb0ddb0fbb54c88a (patch) | |
tree | 868f5632832b7859f9842fb4f551a6b545fa271c /src | |
parent | 8b6ac29a0a5133fde5dbff8d39347ca35d187eae (diff) | |
download | mongo-54e87285f802a49456c02c80cb0ddb0fbb54c88a.tar.gz |
Revert "SERVER-49907 Unroll applyOps entry and do resharding CRUD application."
This reverts commit 4363473d75cab2a487c6a6066b601d52230c7e1a.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_application.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_application.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_applier.cpp | 199 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_applier.h | 25 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp | 214 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h | 113 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp | 372 |
10 files changed, 251 insertions, 748 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index cfc2e635030..508dd5addd9 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -511,11 +511,7 @@ BSONObj OplogEntry::toBSONForLogging() const { builder.append("oplogEntry", entry); if (_isForCappedCollection) { - builder.append("isForCappedCollection", _isForCappedCollection); - } - - if (_isForReshardingSessionApplication) { - builder.append("isForReshardingSessionApplication", _isForReshardingSessionApplication); + builder.append("isForCappedCollection", *_isForCappedCollection); } if (_preImageOp) { @@ -542,7 +538,7 @@ BSONObj OplogEntry::toBSONForLogging() const { } bool OplogEntry::isForCappedCollection() const { - return _isForCappedCollection; + return _isForCappedCollection.get_value_or(false); } void OplogEntry::setIsForCappedCollection(bool isForCappedCollection) { @@ -575,14 +571,6 @@ void OplogEntry::setPostImageOp(const BSONObj& postImageOp) { uassertStatusOK(DurableOplogEntry::parse(postImageOp)))); } -bool OplogEntry::isForReshardingSessionApplication() const { - return _isForReshardingSessionApplication; -} - -void OplogEntry::setIsForReshardingSessionApplication(bool isForReshardingSessionApplication) { - _isForReshardingSessionApplication = isForReshardingSessionApplication; -} - const boost::optional<mongo::Value>& OplogEntry::get_id() const& { return _entry.get_id(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index e093b526a3c..b901c4ae291 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -491,9 +491,6 @@ public: void setPostImageOp(std::shared_ptr<DurableOplogEntry> postImageOp); void setPostImageOp(const BSONObj& postImageOp); - bool isForReshardingSessionApplication() const; - void setIsForReshardingSessionApplication(bool isForReshardingSessionApplication = true); - std::string toStringForLogging() const; /** @@ -546,15 +543,12 @@ public: private: DurableOplogEntry _entry; + boost::optional<bool> _isForCappedCollection; // We use std::shared_ptr<DurableOplogEntry> rather than boost::optional<DurableOplogEntry> here // so that OplogEntries are cheaper to copy. std::shared_ptr<DurableOplogEntry> _preImageOp; std::shared_ptr<DurableOplogEntry> _postImageOp; - - bool _isForCappedCollection = false; - - bool _isForReshardingSessionApplication = false; }; std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 18df1b1aac1..706007d79d9 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -81,7 +81,6 @@ env.Library( 'resharding/resharding_oplog_applier.cpp', 'resharding/resharding_oplog_applier_progress.idl', 'resharding/resharding_oplog_application.cpp', - 'resharding/resharding_oplog_batch_preparer.cpp', 'resharding/resharding_oplog_fetcher.cpp', 'resharding/resharding_recipient_service.cpp', 'resharding/resharding_server_parameters.idl', @@ -470,7 +469,6 @@ env.CppUnitTest( 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_metrics_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', - 'resharding/resharding_oplog_batch_preparer_test.cpp', 'resharding/resharding_oplog_fetcher_test.cpp', 'resharding/resharding_recipient_service_test.cpp', 'resharding/resharding_donor_service_test.cpp', diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index e97965d5dad..d3065b25dd5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -184,6 +184,49 @@ Status ReshardingOplogApplicationRules::applyOperation( }); } +Status ReshardingOplogApplicationRules::ReshardingOplogApplicationRules::applyCommand( + OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) { + LOGV2_DEBUG(49909, + 3, + "Applying command op for resharding", + "opl"_attr = redact(opOrGroupedInserts.toBSON())); + + auto op = opOrGroupedInserts.getOp(); + + invariant(op.getOpType() == repl::OpTypeEnum::kCommand); + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + invariant(opCtx->writesAreReplicated()); + + return writeConflictRetry(opCtx, "applyOplogEntryCommandOpResharding", op.getNss().ns(), [&] { + OpCounters* opCounters = &globalOpCounters; + opCounters->gotCommand(); + + invariant(op.getNss() == _outputNss); + BSONObj oField = op.getObject(); + + // Only applyOps, commitTransaction, and abortTransaction are allowed. + std::vector<std::string> supportedCmds{"applyOps", "commitTransaction", "abortTransaction"}; + if (std::find(supportedCmds.begin(), supportedCmds.end(), oField.firstElementFieldName()) == + supportedCmds.end()) { + if (oField.firstElementFieldName() == "drop"_sd) { + return Status(ErrorCodes::OplogOperationUnsupported, + str::stream() + << "Received drop command for resharding source collection " + << redact(op.toBSONForLogging())); + } + + return Status(ErrorCodes::OplogOperationUnsupported, + str::stream() << "Command not supported during resharding: " + << redact(op.toBSONForLogging())); + } + + // TODO SERVER-49907 implement applyOps write rule + // TODO SERVER-49905 handle commit and abort transaction rules + return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( + opCtx, opOrGroupedInserts, repl::OplogApplication::Mode::kInitialSync, [] {}, nullptr); + }); +} + void ReshardingOplogApplicationRules::_applyInsert_inlock( OperationContext* opCtx, Database* db, diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h index d6f4d0d6169..a9e63c78201 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.h +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -68,6 +68,13 @@ public: Status applyOperation(OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + /** + * Wraps the command application in a writeConflictRetry loop. Will return an error on any + * command other than "applyOps", "commitTransaction", or "abortTransaction". + */ + Status applyCommand(OperationContext* opCtx, + const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + private: // Applies an insert operation void _applyInsert_inlock(OperationContext* opCtx, diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 20ad9be0a97..61438c523f7 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -42,7 +42,6 @@ #include "mongo/db/db_raii.h" #include "mongo/db/namespace_string.h" #include "mongo/db/persistent_task_store.h" -#include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" @@ -62,6 +61,11 @@ using namespace fmt::literals; namespace { +// Used for marking intermediate oplog entries created by the resharding applier that will require +// special handling in the repl writer thread. These intermediate oplog entries serve as a message +// container and will never be written to an actual collection. +const BSONObj kReshardingOplogTag(BSON("$resharding" << 1)); + /** * Insert a no-op oplog entry that contains the pre/post image document from a retryable write. */ @@ -209,6 +213,23 @@ Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx, return Status::OK(); } +/** + * Returns true if the given oplog is a special no-op oplog entry that contains the information for + * retryable writes. + */ +bool isRetryableNoOp(const repl::OplogEntryOrGroupedInserts& oplogOrGroupedInserts) { + if (oplogOrGroupedInserts.isGroupedInserts()) { + return false; + } + + const auto& op = oplogOrGroupedInserts.getOp(); + if (op.getOpType() != repl::OpTypeEnum::kNoop) { + return false; + } + + return op.getObject().woCompare(kReshardingOplogTag) == 0; +} + ServiceContext::UniqueClient makeKillableClient(ServiceContext* serviceContext, StringData name) { auto client = serviceContext->makeClient(name.toString()); stdx::lock_guard<Client> lk(*client); @@ -244,7 +265,6 @@ ReshardingOplogApplier::ReshardingOplogApplier( _outputNs(_nsBeingResharded.db(), "system.resharding.{}"_format(_uuidBeingResharded.toString())), _reshardingCloneFinishedTs(std::move(reshardingCloneFinishedTs)), - _batchPreparer{CollatorInterface::cloneCollator(sourceChunkMgr.getDefaultCollator())}, _applicationRules(ReshardingOplogApplicationRules( _outputNs, std::move(allStashNss), myStashIdx, _sourceId.getShardId(), sourceChunkMgr)), _service(service), @@ -283,20 +303,16 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() { return _oplogIter->getNextBatch(_executor); }) .then([this](OplogBatch batch) { - _currentBatchToApply = std::move(batch); - - auto applyBatchClient = makeKillableClient(_service, kClientName); - AlternativeClientRegion acr(applyBatchClient); - auto applyBatchOpCtx = makeInterruptibleOperationContext(); - - return _applyBatch(applyBatchOpCtx.get(), false /* isForSessionApplication */); + for (auto&& entry : batch) { + _preProcessAndPushOpsToBuffer(entry); + } }) .then([this] { auto applyBatchClient = makeKillableClient(_service, kClientName); AlternativeClientRegion acr(applyBatchClient); auto applyBatchOpCtx = makeInterruptibleOperationContext(); - return _applyBatch(applyBatchOpCtx.get(), true /* isForSessionApplication */); + return _applyBatch(applyBatchOpCtx.get()); }) .then([this] { if (_currentBatchToApply.empty()) { @@ -335,14 +351,11 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() { }); } -Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx, - bool isForSessionApplication) { - if (isForSessionApplication) { - _currentWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply); - } else { - _currentWriterVectors = - _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps); - } +Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) { + // TODO: handle config.transaction updates with derivedOps + + auto writerVectors = _fillWriterVectors(opCtx, &_currentBatchToApply, &_currentDerivedOps); + _currentWriterVectors.swap(writerVectors); auto pf = makePromiseFuture<void>(); @@ -371,6 +384,107 @@ Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx, return std::move(pf.future); } +/** + * Convert the given oplog entry into a pseudo oplog that is going to be used for updating the + * config.transactions and inserting the necessary oplog entries during resharing oplog application + * for this oplog. + */ +repl::OplogEntry convertToNoOpWithReshardingTag(const repl::OplogEntry& oplog) { + repl::OplogEntry newOplog(repl::DurableOplogEntry(oplog.getOpTime(), + oplog.getHash(), + repl::OpTypeEnum::kNoop, + oplog.getNss(), + boost::none /* uuid */, + oplog.getFromMigrate(), + oplog.getVersion(), + kReshardingOplogTag, + // Set the o2 field with the original oplog. + oplog.getEntry().toBSON(), + oplog.getOperationSessionInfo(), + oplog.getUpsert(), + oplog.getWallClockTime(), + oplog.getStatementId(), + oplog.getPrevWriteOpTimeInTransaction(), + oplog.getPreImageOpTime(), + oplog.getPostImageOpTime(), + oplog.getDestinedRecipient(), + oplog.get_id())); + + newOplog.setPreImageOp(oplog.getPreImageOp()); + newOplog.setPostImageOp(oplog.getPostImageOp()); + + return newOplog; +} + +void addDerivedOpsToWriterVector(std::vector<std::vector<const repl::OplogEntry*>>* writerVectors, + const std::vector<repl::OplogEntry>& derivedOps) { + for (auto&& op : derivedOps) { + invariant(op.getObject().woCompare(kReshardingOplogTag) == 0); + uassert(4990403, + "expected resharding derived oplog to have session id: {}"_format( + redact(op.toBSONForLogging()).toString()), + op.getSessionId()); + + LogicalSessionIdHash hasher; + auto writerId = hasher(*op.getSessionId()) % writerVectors->size(); + (*writerVectors)[writerId].push_back(&op); + } +} + +std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillWriterVectors( + OperationContext* opCtx, OplogBatch* batch, OplogBatch* derivedOps) { + std::vector<std::vector<const repl::OplogEntry*>> writerVectors( + _writerPool->getStats().numThreads); + repl::CachedCollectionProperties collPropertiesCache; + + LogicalSessionIdMap<RetryableOpsList> sessionTracker; + + for (auto&& op : *batch) { + uassert(5012000, + "Resharding oplog application does not support prepared transactions.", + !op.shouldPrepare()); + uassert(5012001, + "Resharding oplog application does not support prepared transactions.", + !op.isPreparedCommit()); + + if (op.getOpType() == repl::OpTypeEnum::kNoop) + continue; + + repl::OplogApplierUtils::addToWriterVector( + opCtx, &op, &writerVectors, &collPropertiesCache); + + if (auto sessionId = op.getSessionId()) { + auto& retryableOpList = sessionTracker[*sessionId]; + auto txnNumber = *op.getTxnNumber(); + + if (retryableOpList.txnNum == txnNumber) { + retryableOpList.ops.push_back(&op); + } else if (retryableOpList.txnNum < txnNumber) { + retryableOpList.ops.clear(); + retryableOpList.ops.push_back(&op); + retryableOpList.txnNum = txnNumber; + } else { + uasserted(4990401, + str::stream() << "retryable oplog applier for " << _sourceId.toBSON() + << " encountered out of order txnNum, saw " + << redact(op.toBSONForLogging()) << " after " + << redact(retryableOpList.ops.front()->toBSONForLogging())); + } + } + } + + for (const auto& sessionsToUpdate : sessionTracker) { + for (const auto& op : sessionsToUpdate.second.ops) { + auto noOpWithPrePost = convertToNoOpWithReshardingTag(*op); + derivedOps->push_back(std::move(noOpWithPrePost)); + } + } + + addDerivedOpsToWriterVector(&writerVectors, _currentDerivedOps); + + return writerVectors; +} + Status ReshardingOplogApplier::_applyOplogBatchPerWorker( std::vector<const repl::OplogEntry*>* ops) { auto opCtx = makeInterruptibleOperationContext(); @@ -395,14 +509,57 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts( auto op = entryOrGroupedInserts.getOp(); - if (op.isForReshardingSessionApplication()) { + if (isRetryableNoOp(entryOrGroupedInserts)) { return insertOplogAndUpdateConfigForRetryable(opCtx, entryOrGroupedInserts.getOp()); } invariant(DocumentValidationSettings::get(opCtx).isSchemaValidationDisabled()); - invariant(op.isCrudOpType()); - return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts); + auto opType = op.getOpType(); + if (opType == repl::OpTypeEnum::kNoop) { + return Status::OK(); + } else if (repl::DurableOplogEntry::isCrudOpType(opType)) { + return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts); + } else if (opType == repl::OpTypeEnum::kCommand) { + return _applicationRules.applyCommand(opCtx, entryOrGroupedInserts); + } + + MONGO_UNREACHABLE; +} + +void ReshardingOplogApplier::_preProcessAndPushOpsToBuffer(repl::OplogEntry oplog) { + uassert(5012002, + str::stream() << "trying to apply oplog not belonging to ns " << _nsBeingResharded + << " during resharding: " << redact(oplog.toBSONForLogging()), + _nsBeingResharded == oplog.getNss()); + uassert(5012005, + str::stream() << "trying to apply oplog with a different UUID from " + << _uuidBeingResharded + << " during resharding: " << redact(oplog.toBSONForLogging()), + _uuidBeingResharded == oplog.getUuid()); + + repl::OplogEntry newOplog(repl::DurableOplogEntry(oplog.getOpTime(), + oplog.getHash(), + oplog.getOpType(), + _outputNs, + boost::none /* uuid */, + oplog.getFromMigrate(), + oplog.getVersion(), + oplog.getObject(), + oplog.getObject2(), + oplog.getOperationSessionInfo(), + oplog.getUpsert(), + oplog.getWallClockTime(), + oplog.getStatementId(), + oplog.getPrevWriteOpTimeInTransaction(), + oplog.getPreImageOpTime(), + oplog.getPostImageOpTime(), + oplog.getDestinedRecipient(), + oplog.get_id())); + newOplog.setPreImageOp(oplog.getPreImageOp()); + newOplog.setPostImageOp(oplog.getPostImageOp()); + + _currentBatchToApply.push_back(std::move(newOplog)); } Status ReshardingOplogApplier::_onError(Status status) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index 128ecb581e5..aff9b5da1c5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -36,7 +36,6 @@ #include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" -#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h" #include "mongo/executor/task_executor.h" #include "mongo/s/chunk_manager.h" #include "mongo/util/future.h" @@ -94,6 +93,12 @@ private: enum class Stage { kStarted, kErrorOccurred, kReachedCloningTS, kFinished }; + struct RetryableOpsList { + public: + TxnNumber txnNum{kUninitializedTxnNumber}; + std::vector<repl::OplogEntry*> ops; + }; + /** * Returns a future that becomes ready when the next batch of oplog entries have been collected * and applied. @@ -104,7 +109,14 @@ private: * Setup the worker threads to apply the ops in the current buffer in parallel. Waits for all * worker threads to finish (even when some of them finished early due to an error). */ - Future<void> _applyBatch(OperationContext* opCtx, bool isForSessionApplication); + Future<void> _applyBatch(OperationContext* opCtx); + + /** + * Partition the currently buffered oplog entries so they can be applied in parallel. + */ + std::vector<std::vector<const repl::OplogEntry*>> _fillWriterVectors(OperationContext* opCtx, + OplogBatch* batch, + OplogBatch* derivedOps); /** * Apply a slice of oplog entries from the current batch for a worker thread. @@ -118,6 +130,11 @@ private: OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& entryOrGroupedInserts); /** + * Perform necessary adjustments to the oplog entry so it will be ready to be applied. + */ + void _preProcessAndPushOpsToBuffer(repl::OplogEntry oplog); + + /** * Record results from a writer vector for the current batch being applied. */ void _onWriterVectorDone(Status status); @@ -157,8 +174,6 @@ private: // finished cloning from it. const Timestamp _reshardingCloneFinishedTs; - const ReshardingOplogBatchPreparer _batchPreparer; - // Actually applies the ops, using special rules that apply only to resharding. Only used when // the 'useReshardingOplogApplicationRules' server parameter is set to true. ReshardingOplogApplicationRules _applicationRules; @@ -185,7 +200,7 @@ private: OplogBatch _currentBatchToApply; // (R) Buffer for internally generated oplog entries that needs to be processed for this batch. - std::list<repl::OplogEntry> _currentDerivedOps; + OplogBatch _currentDerivedOps; // (R) A temporary scratch pad that contains pointers to oplog entries in _currentBatchToApply // that is used by the writer vector when applying oplog in parallel. diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp deleted file mode 100644 index f9903646d70..00000000000 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h" - -#include <third_party/murmurhash3/MurmurHash3.h> - -#include "mongo/bson/bsonelement_comparator.h" -#include "mongo/db/logical_session_id.h" -#include "mongo/db/query/collation/collator_interface.h" -#include "mongo/db/repl/apply_ops.h" -#include "mongo/db/s/resharding/resharding_server_parameters_gen.h" -#include "mongo/logv2/redaction.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/str.h" - -namespace mongo { - -using WriterVectors = ReshardingOplogBatchPreparer::WriterVectors; - -ReshardingOplogBatchPreparer::ReshardingOplogBatchPreparer( - std::unique_ptr<CollatorInterface> defaultCollator) - : _defaultCollator(std::move(defaultCollator)) {} - -void ReshardingOplogBatchPreparer::throwIfUnsupportedCommandOp(const OplogEntry& op) { - invariant(op.isCommand()); - switch (op.getCommandType()) { - case OplogEntry::CommandType::kApplyOps: - case OplogEntry::CommandType::kCommitTransaction: - case OplogEntry::CommandType::kAbortTransaction: - return; - - case OplogEntry::CommandType::kDrop: - uasserted(ErrorCodes::OplogOperationUnsupported, - str::stream() << "Received drop command for resharding's source collection: " - << redact(op.toBSONForLogging())); - - default: - uasserted(ErrorCodes::OplogOperationUnsupported, - str::stream() << "Command not supported during resharding: " - << redact(op.toBSONForLogging())); - } -} - -WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( - const OplogBatchToPrepare& batch, std::list<OplogEntry>& derivedOps) const { - invariant(derivedOps.empty()); - - auto writerVectors = _makeEmptyWriterVectors(); - - for (const auto& op : batch) { - invariant(!op.isForReshardingSessionApplication()); - if (op.isCrudOpType()) { - _appendCrudOpToWriterVector(&op, writerVectors); - } else if (op.isCommand()) { - throwIfUnsupportedCommandOp(op); - - if (op.getCommandType() != OplogEntry::CommandType::kApplyOps) { - continue; - } - - auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); - uassert( - ErrorCodes::OplogOperationUnsupported, - str::stream() << "Commands within applyOps are not supported during resharding: " - << redact(op.toBSONForLogging()), - applyOpsInfo.areOpsCrudOnly()); - - auto unrolledOp = - uassertStatusOK(repl::MutableOplogEntry::parse(op.getEntry().toBSON())); - - for (const auto& innerOp : applyOpsInfo.getOperations()) { - unrolledOp.setDurableReplOperation(repl::DurableReplOperation::parse( - {"ReshardingOplogBatchPreparer::makeCrudOpWriterVectors innerOp"}, innerOp)); - - // There isn't a direct way to convert from a MutableOplogEntry to a - // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get - // re-parsed into an OplogEntry. - auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON()); - invariant(derivedOp.isCrudOpType()); - - // `&derivedOp` is guaranteed to remain stable while we append more derived oplog - // entries because `derivedOps` is a std::list. - _appendCrudOpToWriterVector(&derivedOp, writerVectors); - } - } else { - invariant(repl::OpTypeEnum::kNoop == op.getOpType()); - } - } - - return writerVectors; -} - -WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( - OplogBatchToPrepare& batch) const { - auto writerVectors = _makeEmptyWriterVectors(); - - struct SessionOpsList { - TxnNumber txnNum = kUninitializedTxnNumber; - std::vector<OplogEntry*> ops; - }; - - LogicalSessionIdMap<SessionOpsList> sessionTracker; - - for (auto& op : batch) { - if (op.isCrudOpType()) { - if (const auto& lsid = op.getSessionId()) { - uassert(4990700, - str::stream() << "Missing txnNumber for oplog entry with lsid: " - << redact(op.toBSONForLogging()), - op.getTxnNumber()); - - auto txnNumber = *op.getTxnNumber(); - - auto& retryableOpList = sessionTracker[*lsid]; - if (txnNumber == retryableOpList.txnNum) { - retryableOpList.ops.emplace_back(&op); - } else if (txnNumber > retryableOpList.txnNum) { - retryableOpList.ops = {&op}; - retryableOpList.txnNum = txnNumber; - } else { - uasserted(4990401, - str::stream() - << "Encountered out of order txnNumbers; batch had " - << redact(op.toBSONForLogging()) << " after " - << redact(retryableOpList.ops.back()->toBSONForLogging())); - } - } - } else if (op.isCommand()) { - throwIfUnsupportedCommandOp(op); - - // TODO SERVER-49905: Replace ops and update txnNum for the following cases: - // - a non-partialTxn:true, non-prepare:true applyOps entry = the final applyOps entry - // from a non-prepared transaction, - // - a commitTransaction oplog entry, or - // - an abortTransaction oplog entry. - } else { - invariant(repl::OpTypeEnum::kNoop == op.getOpType()); - } - } - - for (auto& [lsid, opList] : sessionTracker) { - for (auto& op : opList.ops) { - op->setIsForReshardingSessionApplication(); - _appendSessionOpToWriterVector(lsid, op, writerVectors); - } - } - - return writerVectors; -} - -WriterVectors ReshardingOplogBatchPreparer::_makeEmptyWriterVectors() const { - return WriterVectors(size_t(resharding::gReshardingWriterThreadCount)); -} - -void ReshardingOplogBatchPreparer::_appendCrudOpToWriterVector(const OplogEntry* op, - WriterVectors& writerVectors) const { - BSONElementComparator elementHasher{BSONElementComparator::FieldNamesMode::kIgnore, - _defaultCollator.get()}; - - const size_t idHash = elementHasher.hash(op->getIdElement()); - - uint32_t hash = 0; - MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); - - _appendOpToWriterVector(hash, op, writerVectors); -} - -void ReshardingOplogBatchPreparer::_appendSessionOpToWriterVector( - const LogicalSessionId& lsid, const OplogEntry* op, WriterVectors& writerVectors) const { - LogicalSessionIdHash lsidHasher; - _appendOpToWriterVector(lsidHasher(lsid), op, writerVectors); -} - -void ReshardingOplogBatchPreparer::_appendOpToWriterVector(std::uint32_t hash, - const OplogEntry* op, - WriterVectors& writerVectors) const { - auto& writer = writerVectors[hash % writerVectors.size()]; - if (writer.empty()) { - // Skip a few growth rounds in anticipation that we'll be appending more. - writer.reserve(8U); - } - writer.emplace_back(op); -} - -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h deleted file mode 100644 index 33faa290643..00000000000 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <cstdint> -#include <list> -#include <memory> -#include <vector> - -#include "mongo/db/repl/oplog_entry.h" - -namespace mongo { - -class CollatorInterface; -class LogicalSessionId; - -/** - * Converts a batch of oplog entries to be applied into multiple batches of oplog entries that may - * be applied concurrently by different threads. - * - * Instances of this class are thread-safe. - */ -class ReshardingOplogBatchPreparer { -private: - using OplogEntry = repl::OplogEntry; - -public: - ReshardingOplogBatchPreparer(std::unique_ptr<CollatorInterface> defaultCollator); - - using OplogBatchToPrepare = std::vector<OplogEntry>; - using OplogBatchToApply = std::vector<const OplogEntry*>; - using WriterVectors = std::vector<OplogBatchToApply>; - - /** - * Prepares a batch of oplog entries for CRUD application by multiple threads concurrently. - * - * The returned writer vectors guarantee that modifications to the same document (as identified - * by its _id) will be in the same writer vector and will appear in their corresponding `batch` - * order. - * - * The returned writer vectors refer to memory owned by `batch` and `derivedOps`. The caller - * must take care to ensure both `batch` and `derivedOps` outlive the writer vectors all being - * applied and must take care not to modify `batch` or `derivedOps` until after the writer - * vectors have all been applied. In particular, `makeSessionOpWriterVectors(batch)` must not be - * called until after the returned writer vectors have all been applied. - */ - WriterVectors makeCrudOpWriterVectors(const OplogBatchToPrepare& batch, - std::list<OplogEntry>& derivedOps) const; - - /** - * Prepares a batch of oplog entries for session application by multiple threads concurrently. - * - * The returned writer vectors guarantee that modifications to the same config.transactions - * records (as identified by its lsid) will be in the same writer vector. Additionally, updates - * to the config.transactions record for a higher txnNumber will cause any updates in `batch` - * for lower txnNumbers to be elided. - * - * The returned writer vectors refer to memory owned by `batch`. The caller must take care to - * ensure `batch` outlives the writer vectors all being applied and must take care not to modify - * `batch` until after the writer vectors have all been applied. - * - * As a performance optimization, to avoid creating a separate copy of `batch`, this function - * mutates the contained oplog entries. The caller should take care to apply the writer vectors - * from `makeCrudOpWriterVectors(batch)` first. - */ - WriterVectors makeSessionOpWriterVectors(OplogBatchToPrepare& batch) const; - - static void throwIfUnsupportedCommandOp(const OplogEntry& op); - -private: - WriterVectors _makeEmptyWriterVectors() const; - - void _appendCrudOpToWriterVector(const OplogEntry* op, WriterVectors& writerVectors) const; - - void _appendSessionOpToWriterVector(const LogicalSessionId& lsid, - const OplogEntry* op, - WriterVectors& writerVectors) const; - - void _appendOpToWriterVector(std::uint32_t hash, - const OplogEntry* op, - WriterVectors& writerVectors) const; - - const std::unique_ptr<CollatorInterface> _defaultCollator; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp deleted file mode 100644 index 07ad1acae57..00000000000 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp +++ /dev/null @@ -1,372 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <boost/optional/optional_io.hpp> - -#include "mongo/db/query/collation/collator_interface.h" -#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h" -#include "mongo/db/s/resharding/resharding_server_parameters_gen.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -using OplogBatch = ReshardingOplogBatchPreparer::OplogBatchToPrepare; - -class ScopedServerParameterChange { -public: - ScopedServerParameterChange(int* param, int newValue) : _param(param), _originalValue(*_param) { - *param = newValue; - } - - ~ScopedServerParameterChange() { - *_param = _originalValue; - } - -private: - int* const _param; - const int _originalValue; -}; - -class ReshardingOplogBatchPreparerTest : public unittest::Test { -protected: - repl::OplogEntry makeUpdateOp(BSONObj document) { - return makeUpdateOp(std::move(document), boost::none, boost::none); - } - - repl::OplogEntry makeUpdateOp(BSONObj document, - boost::optional<LogicalSessionId> lsid, - boost::optional<TxnNumber> txnNumber) { - repl::MutableOplogEntry op; - op.setOpType(repl::OpTypeEnum::kUpdate); - op.setObject2(document["_id"].wrap().getOwned()); - op.setObject(std::move(document)); - op.setSessionId(std::move(lsid)); - op.setTxnNumber(std::move(txnNumber)); - - // These are unused by ReshardingOplogBatchPreparer but required by IDL parsing. - op.setNss({}); - op.setOpTimeAndWallTimeBase({{}, {}}); - - return {op.toBSON()}; - } - - repl::OplogEntry makeCommandOp(BSONObj commandObj) { - return makeCommandOp(std::move(commandObj), boost::none, boost::none); - } - - repl::OplogEntry makeCommandOp(BSONObj commandObj, - boost::optional<LogicalSessionId> lsid, - boost::optional<TxnNumber> txnNumber) { - repl::MutableOplogEntry op; - op.setOpType(repl::OpTypeEnum::kCommand); - op.setObject(std::move(commandObj)); - op.setSessionId(std::move(lsid)); - op.setTxnNumber(std::move(txnNumber)); - - // These are unused by ReshardingOplogBatchPreparer but required by IDL parsing. - op.setNss({}); - op.setOpTimeAndWallTimeBase({{}, {}}); - - return {op.toBSON()}; - } - - ReshardingOplogBatchPreparer::OplogBatchToApply& getNonEmptyWriterVector( - ReshardingOplogBatchPreparer::WriterVectors writerVectors) { - ReshardingOplogBatchPreparer::OplogBatchToApply* nonempty = nullptr; - - for (auto& writer : writerVectors) { - if (!writer.empty()) { - ASSERT_FALSE(nonempty) - << "Expected only one non-empty writer vector, but found multiple"; - nonempty = &writer; - } - } - - ASSERT_TRUE(nonempty) << "Expected to find a non-empty writer vector, but didn't"; - return *nonempty; - } - - ReshardingOplogBatchPreparer _batchPreparer{nullptr}; - - static constexpr size_t kNumWriterVectors = 2; - -private: - ScopedServerParameterChange _numWriterVectors{&resharding::gReshardingWriterThreadCount, - int(kNumWriterVectors)}; -}; - -TEST_F(ReshardingOplogBatchPreparerTest, AssignsCrudOpsToWriterVectorsById) { - OplogBatch batch; - - int numOps = 10; - for (int i = 0; i < numOps; ++i) { - batch.emplace_back(makeUpdateOp(BSON("_id" << 0 << "n" << i))); - } - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), 0U); - - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), numOps); - for (int i = 0; i < numOps; ++i) { - ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), BSON("_id" << 0 << "n" << i)); - } -} - -TEST_F(ReshardingOplogBatchPreparerTest, DistributesCrudOpsToWriterVectorsFairly) { - OplogBatch batch; - - int numOps = 100; - for (int i = 0; i < numOps; ++i) { - batch.emplace_back(makeUpdateOp(BSON("_id" << i))); - } - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), 0U); - - // Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the _id - // values will be hashed but can at least assert the writer vector sizes won't wildly differ - // from each other. - ASSERT_GTE(writerVectors[0].size(), numOps / 5); - ASSERT_GTE(writerVectors[1].size(), numOps / 5); - ASSERT_EQ(writerVectors[0].size() + writerVectors[1].size(), numOps); -} - -TEST_F(ReshardingOplogBatchPreparerTest, CreatesDerivedCrudOpsForApplyOps) { - OplogBatch batch; - - // We use the "fromApplyOps" field in the document to distinguish between the regular oplog - // entries from the derived ones later on. - int numOps = 20; - for (int i = 0; i < numOps; ++i) { - batch.emplace_back(makeUpdateOp(BSON("_id" << i << "fromApplyOps" << false))); - } - - BSONObjBuilder applyOpsBuilder; - { - BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); - for (int i = 0; i < numOps; ++i) { - // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to - // deal with setting the 'o2' field. - opsArrayBuilder.append( - repl::DurableReplOperation( - repl::OpTypeEnum::kInsert, {}, BSON("_id" << i << "fromApplyOps" << true)) - .toBSON()); - } - } - - batch.emplace_back(makeCommandOp(applyOpsBuilder.done())); - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), numOps); - - ASSERT_EQ(writerVectors[0].size() + writerVectors[1].size(), numOps * 2); - - for (const auto& writer : writerVectors) { - for (size_t i = 0; i < writer.size(); ++i) { - if (writer[i]->getObject()["fromApplyOps"].Bool()) { - continue; - } - - int docId = writer[i]->getObject()["_id"].Int(); - - bool found = false; - for (size_t j = i + 1; j < writer.size(); ++j) { - if (writer[j]->getObject()["fromApplyOps"].Bool() && - writer[j]->getObject()["_id"].Int() == docId) { - found = true; - } - } - ASSERT_TRUE(found) << "Expected to find normal op and unrolled applyOps for _id=" - << docId << " in same writer vector, but didn't"; - } - } -} - -TEST_F(ReshardingOplogBatchPreparerTest, InterleavesDerivedCrudOpsForApplyOps) { - OplogBatch batch; - - int numOps = 10; - for (int i = 0; i < numOps; ++i) { - // We use the "fromApplyOps" field in the document to distinguish between the regular oplog - // entries from the derived ones later on. - if (i % 2 == 0) { - batch.emplace_back( - makeUpdateOp(BSON("_id" << 0 << "n" << i << "fromApplyOps" << false))); - } else { - // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to - // deal with setting the 'o2' field. - batch.emplace_back(makeCommandOp(BSON( - "applyOps" << BSON_ARRAY(repl::DurableReplOperation( - repl::OpTypeEnum::kInsert, - {}, - BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)) - .toBSON())))); - } - } - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), numOps / 2); - - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), numOps); - for (int i = 0; i < numOps; ++i) { - if (i % 2 == 0) { - ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), - BSON("_id" << 0 << "n" << i << "fromApplyOps" << false)); - } else { - ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), - BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)); - } - } -} - -TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid) { - OplogBatch batch; - auto lsid = makeLogicalSessionIdForTest(); - - int numOps = 10; - for (int i = 0; i < numOps; ++i) { - batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{1})); - } - - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), numOps); - for (int i = 0; i < numOps; ++i) { - ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), BSON("_id" << i)); - ASSERT_EQ(writer[i]->getSessionId(), lsid); - ASSERT_EQ(writer[i]->getTxnNumber(), TxnNumber{1}); - ASSERT_TRUE(writer[i]->isForReshardingSessionApplication()); - } -} - -TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) { - OplogBatch batch; - auto lsid = makeLogicalSessionIdForTest(); - - int numOps = 5; - for (int i = 1; i <= numOps; ++i) { - batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{i})); - } - - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), 1U); - ASSERT_BSONOBJ_BINARY_EQ(writer[0]->getObject(), BSON("_id" << numOps)); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(writer[0]->getTxnNumber(), TxnNumber{numOps}); - ASSERT_TRUE(writer[0]->isForReshardingSessionApplication()); -} - -TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFairly) { - OplogBatch batch; - - int numOps = 100; - for (int i = 0; i < numOps; ++i) { - batch.emplace_back( - makeUpdateOp(BSON("_id" << i), makeLogicalSessionIdForTest(), TxnNumber{1})); - } - - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - - // Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the lsid - // values will be hashed but can at least assert the writer vector sizes won't wildly differ - // from each other. - ASSERT_GTE(writerVectors[0].size(), numOps / 5); - ASSERT_GTE(writerVectors[1].size(), numOps / 5); - ASSERT_EQ(writerVectors[0].size() + writerVectors[1].size(), numOps); -} - -TEST_F(ReshardingOplogBatchPreparerTest, ThrowsForUnsupportedCommandOps) { - { - OplogBatch batch; - batch.emplace_back(makeCommandOp(BSON("drop" << 1))); - - std::list<repl::OplogEntry> derivedOps; - ASSERT_THROWS_CODE(_batchPreparer.makeCrudOpWriterVectors(batch, derivedOps), - DBException, - ErrorCodes::OplogOperationUnsupported); - } - - { - OplogBatch batch; - batch.emplace_back(makeCommandOp(BSON("commitIndexBuild" << 1))); - - std::list<repl::OplogEntry> derivedOps; - ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch), - DBException, - ErrorCodes::OplogOperationUnsupported); - } -} - -TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) { - OplogBatch batch; - - int numOps = 5; - for (int i = 0; i < numOps; ++i) { - repl::MutableOplogEntry op; - op.setOpType(repl::OpTypeEnum::kNoop); - op.setObject({}); - op.setNss({}); - op.setOpTimeAndWallTimeBase({{}, {}}); - batch.emplace_back(op.toBSON()); - } - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writerVectors[0].size(), 0U); - ASSERT_EQ(writerVectors[1].size(), 0U); - - writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(writerVectors[0].size(), 0U); - ASSERT_EQ(writerVectors[1].size(), 0U); -} - -} // namespace -} // namespace mongo |