summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-01-09 11:47:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-09 12:53:37 +0000
commit54e87285f802a49456c02c80cb0ddb0fbb54c88a (patch)
tree868f5632832b7859f9842fb4f551a6b545fa271c /src
parent8b6ac29a0a5133fde5dbff8d39347ca35d187eae (diff)
downloadmongo-54e87285f802a49456c02c80cb0ddb0fbb54c88a.tar.gz
Revert "SERVER-49907 Unroll applyOps entry and do resharding CRUD application."
This reverts commit 4363473d75cab2a487c6a6066b601d52230c7e1a.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp16
-rw-r--r--src/mongo/db/repl/oplog_entry.h8
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp43
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp199
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h25
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp214
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h113
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp372
10 files changed, 251 insertions, 748 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index cfc2e635030..508dd5addd9 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -511,11 +511,7 @@ BSONObj OplogEntry::toBSONForLogging() const {
builder.append("oplogEntry", entry);
if (_isForCappedCollection) {
- builder.append("isForCappedCollection", _isForCappedCollection);
- }
-
- if (_isForReshardingSessionApplication) {
- builder.append("isForReshardingSessionApplication", _isForReshardingSessionApplication);
+ builder.append("isForCappedCollection", *_isForCappedCollection);
}
if (_preImageOp) {
@@ -542,7 +538,7 @@ BSONObj OplogEntry::toBSONForLogging() const {
}
bool OplogEntry::isForCappedCollection() const {
- return _isForCappedCollection;
+ return _isForCappedCollection.get_value_or(false);
}
void OplogEntry::setIsForCappedCollection(bool isForCappedCollection) {
@@ -575,14 +571,6 @@ void OplogEntry::setPostImageOp(const BSONObj& postImageOp) {
uassertStatusOK(DurableOplogEntry::parse(postImageOp))));
}
-bool OplogEntry::isForReshardingSessionApplication() const {
- return _isForReshardingSessionApplication;
-}
-
-void OplogEntry::setIsForReshardingSessionApplication(bool isForReshardingSessionApplication) {
- _isForReshardingSessionApplication = isForReshardingSessionApplication;
-}
-
const boost::optional<mongo::Value>& OplogEntry::get_id() const& {
return _entry.get_id();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index e093b526a3c..b901c4ae291 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -491,9 +491,6 @@ public:
void setPostImageOp(std::shared_ptr<DurableOplogEntry> postImageOp);
void setPostImageOp(const BSONObj& postImageOp);
- bool isForReshardingSessionApplication() const;
- void setIsForReshardingSessionApplication(bool isForReshardingSessionApplication = true);
-
std::string toStringForLogging() const;
/**
@@ -546,15 +543,12 @@ public:
private:
DurableOplogEntry _entry;
+ boost::optional<bool> _isForCappedCollection;
// We use std::shared_ptr<DurableOplogEntry> rather than boost::optional<DurableOplogEntry> here
// so that OplogEntries are cheaper to copy.
std::shared_ptr<DurableOplogEntry> _preImageOp;
std::shared_ptr<DurableOplogEntry> _postImageOp;
-
- bool _isForCappedCollection = false;
-
- bool _isForReshardingSessionApplication = false;
};
std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o);
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 18df1b1aac1..706007d79d9 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -81,7 +81,6 @@ env.Library(
'resharding/resharding_oplog_applier.cpp',
'resharding/resharding_oplog_applier_progress.idl',
'resharding/resharding_oplog_application.cpp',
- 'resharding/resharding_oplog_batch_preparer.cpp',
'resharding/resharding_oplog_fetcher.cpp',
'resharding/resharding_recipient_service.cpp',
'resharding/resharding_server_parameters.idl',
@@ -470,7 +469,6 @@ env.CppUnitTest(
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_metrics_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
- 'resharding/resharding_oplog_batch_preparer_test.cpp',
'resharding/resharding_oplog_fetcher_test.cpp',
'resharding/resharding_recipient_service_test.cpp',
'resharding/resharding_donor_service_test.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index e97965d5dad..d3065b25dd5 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -184,6 +184,49 @@ Status ReshardingOplogApplicationRules::applyOperation(
});
}
+Status ReshardingOplogApplicationRules::ReshardingOplogApplicationRules::applyCommand(
+ OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) {
+ LOGV2_DEBUG(49909,
+ 3,
+ "Applying command op for resharding",
+ "opl"_attr = redact(opOrGroupedInserts.toBSON()));
+
+ auto op = opOrGroupedInserts.getOp();
+
+ invariant(op.getOpType() == repl::OpTypeEnum::kCommand);
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ invariant(opCtx->writesAreReplicated());
+
+ return writeConflictRetry(opCtx, "applyOplogEntryCommandOpResharding", op.getNss().ns(), [&] {
+ OpCounters* opCounters = &globalOpCounters;
+ opCounters->gotCommand();
+
+ invariant(op.getNss() == _outputNss);
+ BSONObj oField = op.getObject();
+
+ // Only applyOps, commitTransaction, and abortTransaction are allowed.
+ std::vector<std::string> supportedCmds{"applyOps", "commitTransaction", "abortTransaction"};
+ if (std::find(supportedCmds.begin(), supportedCmds.end(), oField.firstElementFieldName()) ==
+ supportedCmds.end()) {
+ if (oField.firstElementFieldName() == "drop"_sd) {
+ return Status(ErrorCodes::OplogOperationUnsupported,
+ str::stream()
+ << "Received drop command for resharding source collection "
+ << redact(op.toBSONForLogging()));
+ }
+
+ return Status(ErrorCodes::OplogOperationUnsupported,
+ str::stream() << "Command not supported during resharding: "
+ << redact(op.toBSONForLogging()));
+ }
+
+ // TODO SERVER-49907 implement applyOps write rule
+ // TODO SERVER-49905 handle commit and abort transaction rules
+ return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
+ opCtx, opOrGroupedInserts, repl::OplogApplication::Mode::kInitialSync, [] {}, nullptr);
+ });
+}
+
void ReshardingOplogApplicationRules::_applyInsert_inlock(
OperationContext* opCtx,
Database* db,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
index d6f4d0d6169..a9e63c78201 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -68,6 +68,13 @@ public:
Status applyOperation(OperationContext* opCtx,
const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+ /**
+ * Wraps the command application in a writeConflictRetry loop. Will return an error on any
+ * command other than "applyOps", "commitTransaction", or "abortTransaction".
+ */
+ Status applyCommand(OperationContext* opCtx,
+ const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+
private:
// Applies an insert operation
void _applyInsert_inlock(OperationContext* opCtx,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 20ad9be0a97..61438c523f7 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/persistent_task_store.h"
-#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
@@ -62,6 +61,11 @@ using namespace fmt::literals;
namespace {
+// Used for marking intermediate oplog entries created by the resharding applier that will require
+// special handling in the repl writer thread. These intermediate oplog entries serve as a message
+// container and will never be written to an actual collection.
+const BSONObj kReshardingOplogTag(BSON("$resharding" << 1));
+
/**
* Insert a no-op oplog entry that contains the pre/post image document from a retryable write.
*/
@@ -209,6 +213,23 @@ Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx,
return Status::OK();
}
+/**
+ * Returns true if the given oplog is a special no-op oplog entry that contains the information for
+ * retryable writes.
+ */
+bool isRetryableNoOp(const repl::OplogEntryOrGroupedInserts& oplogOrGroupedInserts) {
+ if (oplogOrGroupedInserts.isGroupedInserts()) {
+ return false;
+ }
+
+ const auto& op = oplogOrGroupedInserts.getOp();
+ if (op.getOpType() != repl::OpTypeEnum::kNoop) {
+ return false;
+ }
+
+ return op.getObject().woCompare(kReshardingOplogTag) == 0;
+}
+
ServiceContext::UniqueClient makeKillableClient(ServiceContext* serviceContext, StringData name) {
auto client = serviceContext->makeClient(name.toString());
stdx::lock_guard<Client> lk(*client);
@@ -244,7 +265,6 @@ ReshardingOplogApplier::ReshardingOplogApplier(
_outputNs(_nsBeingResharded.db(),
"system.resharding.{}"_format(_uuidBeingResharded.toString())),
_reshardingCloneFinishedTs(std::move(reshardingCloneFinishedTs)),
- _batchPreparer{CollatorInterface::cloneCollator(sourceChunkMgr.getDefaultCollator())},
_applicationRules(ReshardingOplogApplicationRules(
_outputNs, std::move(allStashNss), myStashIdx, _sourceId.getShardId(), sourceChunkMgr)),
_service(service),
@@ -283,20 +303,16 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() {
return _oplogIter->getNextBatch(_executor);
})
.then([this](OplogBatch batch) {
- _currentBatchToApply = std::move(batch);
-
- auto applyBatchClient = makeKillableClient(_service, kClientName);
- AlternativeClientRegion acr(applyBatchClient);
- auto applyBatchOpCtx = makeInterruptibleOperationContext();
-
- return _applyBatch(applyBatchOpCtx.get(), false /* isForSessionApplication */);
+ for (auto&& entry : batch) {
+ _preProcessAndPushOpsToBuffer(entry);
+ }
})
.then([this] {
auto applyBatchClient = makeKillableClient(_service, kClientName);
AlternativeClientRegion acr(applyBatchClient);
auto applyBatchOpCtx = makeInterruptibleOperationContext();
- return _applyBatch(applyBatchOpCtx.get(), true /* isForSessionApplication */);
+ return _applyBatch(applyBatchOpCtx.get());
})
.then([this] {
if (_currentBatchToApply.empty()) {
@@ -335,14 +351,11 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() {
});
}
-Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx,
- bool isForSessionApplication) {
- if (isForSessionApplication) {
- _currentWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply);
- } else {
- _currentWriterVectors =
- _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps);
- }
+Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) {
+ // TODO: handle config.transaction updates with derivedOps
+
+ auto writerVectors = _fillWriterVectors(opCtx, &_currentBatchToApply, &_currentDerivedOps);
+ _currentWriterVectors.swap(writerVectors);
auto pf = makePromiseFuture<void>();
@@ -371,6 +384,107 @@ Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx,
return std::move(pf.future);
}
+/**
+ * Convert the given oplog entry into a pseudo oplog that is going to be used for updating the
+ * config.transactions and inserting the necessary oplog entries during resharing oplog application
+ * for this oplog.
+ */
+repl::OplogEntry convertToNoOpWithReshardingTag(const repl::OplogEntry& oplog) {
+ repl::OplogEntry newOplog(repl::DurableOplogEntry(oplog.getOpTime(),
+ oplog.getHash(),
+ repl::OpTypeEnum::kNoop,
+ oplog.getNss(),
+ boost::none /* uuid */,
+ oplog.getFromMigrate(),
+ oplog.getVersion(),
+ kReshardingOplogTag,
+ // Set the o2 field with the original oplog.
+ oplog.getEntry().toBSON(),
+ oplog.getOperationSessionInfo(),
+ oplog.getUpsert(),
+ oplog.getWallClockTime(),
+ oplog.getStatementId(),
+ oplog.getPrevWriteOpTimeInTransaction(),
+ oplog.getPreImageOpTime(),
+ oplog.getPostImageOpTime(),
+ oplog.getDestinedRecipient(),
+ oplog.get_id()));
+
+ newOplog.setPreImageOp(oplog.getPreImageOp());
+ newOplog.setPostImageOp(oplog.getPostImageOp());
+
+ return newOplog;
+}
+
+void addDerivedOpsToWriterVector(std::vector<std::vector<const repl::OplogEntry*>>* writerVectors,
+ const std::vector<repl::OplogEntry>& derivedOps) {
+ for (auto&& op : derivedOps) {
+ invariant(op.getObject().woCompare(kReshardingOplogTag) == 0);
+ uassert(4990403,
+ "expected resharding derived oplog to have session id: {}"_format(
+ redact(op.toBSONForLogging()).toString()),
+ op.getSessionId());
+
+ LogicalSessionIdHash hasher;
+ auto writerId = hasher(*op.getSessionId()) % writerVectors->size();
+ (*writerVectors)[writerId].push_back(&op);
+ }
+}
+
+std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillWriterVectors(
+ OperationContext* opCtx, OplogBatch* batch, OplogBatch* derivedOps) {
+ std::vector<std::vector<const repl::OplogEntry*>> writerVectors(
+ _writerPool->getStats().numThreads);
+ repl::CachedCollectionProperties collPropertiesCache;
+
+ LogicalSessionIdMap<RetryableOpsList> sessionTracker;
+
+ for (auto&& op : *batch) {
+ uassert(5012000,
+ "Resharding oplog application does not support prepared transactions.",
+ !op.shouldPrepare());
+ uassert(5012001,
+ "Resharding oplog application does not support prepared transactions.",
+ !op.isPreparedCommit());
+
+ if (op.getOpType() == repl::OpTypeEnum::kNoop)
+ continue;
+
+ repl::OplogApplierUtils::addToWriterVector(
+ opCtx, &op, &writerVectors, &collPropertiesCache);
+
+ if (auto sessionId = op.getSessionId()) {
+ auto& retryableOpList = sessionTracker[*sessionId];
+ auto txnNumber = *op.getTxnNumber();
+
+ if (retryableOpList.txnNum == txnNumber) {
+ retryableOpList.ops.push_back(&op);
+ } else if (retryableOpList.txnNum < txnNumber) {
+ retryableOpList.ops.clear();
+ retryableOpList.ops.push_back(&op);
+ retryableOpList.txnNum = txnNumber;
+ } else {
+ uasserted(4990401,
+ str::stream() << "retryable oplog applier for " << _sourceId.toBSON()
+ << " encountered out of order txnNum, saw "
+ << redact(op.toBSONForLogging()) << " after "
+ << redact(retryableOpList.ops.front()->toBSONForLogging()));
+ }
+ }
+ }
+
+ for (const auto& sessionsToUpdate : sessionTracker) {
+ for (const auto& op : sessionsToUpdate.second.ops) {
+ auto noOpWithPrePost = convertToNoOpWithReshardingTag(*op);
+ derivedOps->push_back(std::move(noOpWithPrePost));
+ }
+ }
+
+ addDerivedOpsToWriterVector(&writerVectors, _currentDerivedOps);
+
+ return writerVectors;
+}
+
Status ReshardingOplogApplier::_applyOplogBatchPerWorker(
std::vector<const repl::OplogEntry*>* ops) {
auto opCtx = makeInterruptibleOperationContext();
@@ -395,14 +509,57 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts(
auto op = entryOrGroupedInserts.getOp();
- if (op.isForReshardingSessionApplication()) {
+ if (isRetryableNoOp(entryOrGroupedInserts)) {
return insertOplogAndUpdateConfigForRetryable(opCtx, entryOrGroupedInserts.getOp());
}
invariant(DocumentValidationSettings::get(opCtx).isSchemaValidationDisabled());
- invariant(op.isCrudOpType());
- return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts);
+ auto opType = op.getOpType();
+ if (opType == repl::OpTypeEnum::kNoop) {
+ return Status::OK();
+ } else if (repl::DurableOplogEntry::isCrudOpType(opType)) {
+ return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts);
+ } else if (opType == repl::OpTypeEnum::kCommand) {
+ return _applicationRules.applyCommand(opCtx, entryOrGroupedInserts);
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+void ReshardingOplogApplier::_preProcessAndPushOpsToBuffer(repl::OplogEntry oplog) {
+ uassert(5012002,
+ str::stream() << "trying to apply oplog not belonging to ns " << _nsBeingResharded
+ << " during resharding: " << redact(oplog.toBSONForLogging()),
+ _nsBeingResharded == oplog.getNss());
+ uassert(5012005,
+ str::stream() << "trying to apply oplog with a different UUID from "
+ << _uuidBeingResharded
+ << " during resharding: " << redact(oplog.toBSONForLogging()),
+ _uuidBeingResharded == oplog.getUuid());
+
+ repl::OplogEntry newOplog(repl::DurableOplogEntry(oplog.getOpTime(),
+ oplog.getHash(),
+ oplog.getOpType(),
+ _outputNs,
+ boost::none /* uuid */,
+ oplog.getFromMigrate(),
+ oplog.getVersion(),
+ oplog.getObject(),
+ oplog.getObject2(),
+ oplog.getOperationSessionInfo(),
+ oplog.getUpsert(),
+ oplog.getWallClockTime(),
+ oplog.getStatementId(),
+ oplog.getPrevWriteOpTimeInTransaction(),
+ oplog.getPreImageOpTime(),
+ oplog.getPostImageOpTime(),
+ oplog.getDestinedRecipient(),
+ oplog.get_id()));
+ newOplog.setPreImageOp(oplog.getPreImageOp());
+ newOplog.setPostImageOp(oplog.getPostImageOp());
+
+ _currentBatchToApply.push_back(std::move(newOplog));
}
Status ReshardingOplogApplier::_onError(Status status) {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index 128ecb581e5..aff9b5da1c5 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -36,7 +36,6 @@
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
-#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/util/future.h"
@@ -94,6 +93,12 @@ private:
enum class Stage { kStarted, kErrorOccurred, kReachedCloningTS, kFinished };
+ struct RetryableOpsList {
+ public:
+ TxnNumber txnNum{kUninitializedTxnNumber};
+ std::vector<repl::OplogEntry*> ops;
+ };
+
/**
* Returns a future that becomes ready when the next batch of oplog entries have been collected
* and applied.
@@ -104,7 +109,14 @@ private:
* Setup the worker threads to apply the ops in the current buffer in parallel. Waits for all
* worker threads to finish (even when some of them finished early due to an error).
*/
- Future<void> _applyBatch(OperationContext* opCtx, bool isForSessionApplication);
+ Future<void> _applyBatch(OperationContext* opCtx);
+
+ /**
+ * Partition the currently buffered oplog entries so they can be applied in parallel.
+ */
+ std::vector<std::vector<const repl::OplogEntry*>> _fillWriterVectors(OperationContext* opCtx,
+ OplogBatch* batch,
+ OplogBatch* derivedOps);
/**
* Apply a slice of oplog entries from the current batch for a worker thread.
@@ -118,6 +130,11 @@ private:
OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& entryOrGroupedInserts);
/**
+ * Perform necessary adjustments to the oplog entry so it will be ready to be applied.
+ */
+ void _preProcessAndPushOpsToBuffer(repl::OplogEntry oplog);
+
+ /**
* Record results from a writer vector for the current batch being applied.
*/
void _onWriterVectorDone(Status status);
@@ -157,8 +174,6 @@ private:
// finished cloning from it.
const Timestamp _reshardingCloneFinishedTs;
- const ReshardingOplogBatchPreparer _batchPreparer;
-
// Actually applies the ops, using special rules that apply only to resharding. Only used when
// the 'useReshardingOplogApplicationRules' server parameter is set to true.
ReshardingOplogApplicationRules _applicationRules;
@@ -185,7 +200,7 @@ private:
OplogBatch _currentBatchToApply;
// (R) Buffer for internally generated oplog entries that needs to be processed for this batch.
- std::list<repl::OplogEntry> _currentDerivedOps;
+ OplogBatch _currentDerivedOps;
// (R) A temporary scratch pad that contains pointers to oplog entries in _currentBatchToApply
// that is used by the writer vector when applying oplog in parallel.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
deleted file mode 100644
index f9903646d70..00000000000
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h"
-
-#include <third_party/murmurhash3/MurmurHash3.h>
-
-#include "mongo/bson/bsonelement_comparator.h"
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/query/collation/collator_interface.h"
-#include "mongo/db/repl/apply_ops.h"
-#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
-#include "mongo/logv2/redaction.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/str.h"
-
-namespace mongo {
-
-using WriterVectors = ReshardingOplogBatchPreparer::WriterVectors;
-
-ReshardingOplogBatchPreparer::ReshardingOplogBatchPreparer(
- std::unique_ptr<CollatorInterface> defaultCollator)
- : _defaultCollator(std::move(defaultCollator)) {}
-
-void ReshardingOplogBatchPreparer::throwIfUnsupportedCommandOp(const OplogEntry& op) {
- invariant(op.isCommand());
- switch (op.getCommandType()) {
- case OplogEntry::CommandType::kApplyOps:
- case OplogEntry::CommandType::kCommitTransaction:
- case OplogEntry::CommandType::kAbortTransaction:
- return;
-
- case OplogEntry::CommandType::kDrop:
- uasserted(ErrorCodes::OplogOperationUnsupported,
- str::stream() << "Received drop command for resharding's source collection: "
- << redact(op.toBSONForLogging()));
-
- default:
- uasserted(ErrorCodes::OplogOperationUnsupported,
- str::stream() << "Command not supported during resharding: "
- << redact(op.toBSONForLogging()));
- }
-}
-
-WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
- const OplogBatchToPrepare& batch, std::list<OplogEntry>& derivedOps) const {
- invariant(derivedOps.empty());
-
- auto writerVectors = _makeEmptyWriterVectors();
-
- for (const auto& op : batch) {
- invariant(!op.isForReshardingSessionApplication());
- if (op.isCrudOpType()) {
- _appendCrudOpToWriterVector(&op, writerVectors);
- } else if (op.isCommand()) {
- throwIfUnsupportedCommandOp(op);
-
- if (op.getCommandType() != OplogEntry::CommandType::kApplyOps) {
- continue;
- }
-
- auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
- uassert(
- ErrorCodes::OplogOperationUnsupported,
- str::stream() << "Commands within applyOps are not supported during resharding: "
- << redact(op.toBSONForLogging()),
- applyOpsInfo.areOpsCrudOnly());
-
- auto unrolledOp =
- uassertStatusOK(repl::MutableOplogEntry::parse(op.getEntry().toBSON()));
-
- for (const auto& innerOp : applyOpsInfo.getOperations()) {
- unrolledOp.setDurableReplOperation(repl::DurableReplOperation::parse(
- {"ReshardingOplogBatchPreparer::makeCrudOpWriterVectors innerOp"}, innerOp));
-
- // There isn't a direct way to convert from a MutableOplogEntry to a
- // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get
- // re-parsed into an OplogEntry.
- auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON());
- invariant(derivedOp.isCrudOpType());
-
- // `&derivedOp` is guaranteed to remain stable while we append more derived oplog
- // entries because `derivedOps` is a std::list.
- _appendCrudOpToWriterVector(&derivedOp, writerVectors);
- }
- } else {
- invariant(repl::OpTypeEnum::kNoop == op.getOpType());
- }
- }
-
- return writerVectors;
-}
-
-WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
- OplogBatchToPrepare& batch) const {
- auto writerVectors = _makeEmptyWriterVectors();
-
- struct SessionOpsList {
- TxnNumber txnNum = kUninitializedTxnNumber;
- std::vector<OplogEntry*> ops;
- };
-
- LogicalSessionIdMap<SessionOpsList> sessionTracker;
-
- for (auto& op : batch) {
- if (op.isCrudOpType()) {
- if (const auto& lsid = op.getSessionId()) {
- uassert(4990700,
- str::stream() << "Missing txnNumber for oplog entry with lsid: "
- << redact(op.toBSONForLogging()),
- op.getTxnNumber());
-
- auto txnNumber = *op.getTxnNumber();
-
- auto& retryableOpList = sessionTracker[*lsid];
- if (txnNumber == retryableOpList.txnNum) {
- retryableOpList.ops.emplace_back(&op);
- } else if (txnNumber > retryableOpList.txnNum) {
- retryableOpList.ops = {&op};
- retryableOpList.txnNum = txnNumber;
- } else {
- uasserted(4990401,
- str::stream()
- << "Encountered out of order txnNumbers; batch had "
- << redact(op.toBSONForLogging()) << " after "
- << redact(retryableOpList.ops.back()->toBSONForLogging()));
- }
- }
- } else if (op.isCommand()) {
- throwIfUnsupportedCommandOp(op);
-
- // TODO SERVER-49905: Replace ops and update txnNum for the following cases:
- // - a non-partialTxn:true, non-prepare:true applyOps entry = the final applyOps entry
- // from a non-prepared transaction,
- // - a commitTransaction oplog entry, or
- // - an abortTransaction oplog entry.
- } else {
- invariant(repl::OpTypeEnum::kNoop == op.getOpType());
- }
- }
-
- for (auto& [lsid, opList] : sessionTracker) {
- for (auto& op : opList.ops) {
- op->setIsForReshardingSessionApplication();
- _appendSessionOpToWriterVector(lsid, op, writerVectors);
- }
- }
-
- return writerVectors;
-}
-
-WriterVectors ReshardingOplogBatchPreparer::_makeEmptyWriterVectors() const {
- return WriterVectors(size_t(resharding::gReshardingWriterThreadCount));
-}
-
-void ReshardingOplogBatchPreparer::_appendCrudOpToWriterVector(const OplogEntry* op,
- WriterVectors& writerVectors) const {
- BSONElementComparator elementHasher{BSONElementComparator::FieldNamesMode::kIgnore,
- _defaultCollator.get()};
-
- const size_t idHash = elementHasher.hash(op->getIdElement());
-
- uint32_t hash = 0;
- MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
-
- _appendOpToWriterVector(hash, op, writerVectors);
-}
-
-void ReshardingOplogBatchPreparer::_appendSessionOpToWriterVector(
- const LogicalSessionId& lsid, const OplogEntry* op, WriterVectors& writerVectors) const {
- LogicalSessionIdHash lsidHasher;
- _appendOpToWriterVector(lsidHasher(lsid), op, writerVectors);
-}
-
-void ReshardingOplogBatchPreparer::_appendOpToWriterVector(std::uint32_t hash,
- const OplogEntry* op,
- WriterVectors& writerVectors) const {
- auto& writer = writerVectors[hash % writerVectors.size()];
- if (writer.empty()) {
- // Skip a few growth rounds in anticipation that we'll be appending more.
- writer.reserve(8U);
- }
- writer.emplace_back(op);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
deleted file mode 100644
index 33faa290643..00000000000
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <cstdint>
-#include <list>
-#include <memory>
-#include <vector>
-
-#include "mongo/db/repl/oplog_entry.h"
-
-namespace mongo {
-
-class CollatorInterface;
-class LogicalSessionId;
-
-/**
- * Converts a batch of oplog entries to be applied into multiple batches of oplog entries that may
- * be applied concurrently by different threads.
- *
- * Instances of this class are thread-safe.
- */
-class ReshardingOplogBatchPreparer {
-private:
- using OplogEntry = repl::OplogEntry;
-
-public:
- ReshardingOplogBatchPreparer(std::unique_ptr<CollatorInterface> defaultCollator);
-
- using OplogBatchToPrepare = std::vector<OplogEntry>;
- using OplogBatchToApply = std::vector<const OplogEntry*>;
- using WriterVectors = std::vector<OplogBatchToApply>;
-
- /**
- * Prepares a batch of oplog entries for CRUD application by multiple threads concurrently.
- *
- * The returned writer vectors guarantee that modifications to the same document (as identified
- * by its _id) will be in the same writer vector and will appear in their corresponding `batch`
- * order.
- *
- * The returned writer vectors refer to memory owned by `batch` and `derivedOps`. The caller
- * must take care to ensure both `batch` and `derivedOps` outlive the writer vectors all being
- * applied and must take care not to modify `batch` or `derivedOps` until after the writer
- * vectors have all been applied. In particular, `makeSessionOpWriterVectors(batch)` must not be
- * called until after the returned writer vectors have all been applied.
- */
- WriterVectors makeCrudOpWriterVectors(const OplogBatchToPrepare& batch,
- std::list<OplogEntry>& derivedOps) const;
-
- /**
- * Prepares a batch of oplog entries for session application by multiple threads concurrently.
- *
- * The returned writer vectors guarantee that modifications to the same config.transactions
- * records (as identified by its lsid) will be in the same writer vector. Additionally, updates
- * to the config.transactions record for a higher txnNumber will cause any updates in `batch`
- * for lower txnNumbers to be elided.
- *
- * The returned writer vectors refer to memory owned by `batch`. The caller must take care to
- * ensure `batch` outlives the writer vectors all being applied and must take care not to modify
- * `batch` until after the writer vectors have all been applied.
- *
- * As a performance optimization, to avoid creating a separate copy of `batch`, this function
- * mutates the contained oplog entries. The caller should take care to apply the writer vectors
- * from `makeCrudOpWriterVectors(batch)` first.
- */
- WriterVectors makeSessionOpWriterVectors(OplogBatchToPrepare& batch) const;
-
- static void throwIfUnsupportedCommandOp(const OplogEntry& op);
-
-private:
- WriterVectors _makeEmptyWriterVectors() const;
-
- void _appendCrudOpToWriterVector(const OplogEntry* op, WriterVectors& writerVectors) const;
-
- void _appendSessionOpToWriterVector(const LogicalSessionId& lsid,
- const OplogEntry* op,
- WriterVectors& writerVectors) const;
-
- void _appendOpToWriterVector(std::uint32_t hash,
- const OplogEntry* op,
- WriterVectors& writerVectors) const;
-
- const std::unique_ptr<CollatorInterface> _defaultCollator;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
deleted file mode 100644
index 07ad1acae57..00000000000
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <boost/optional/optional_io.hpp>
-
-#include "mongo/db/query/collation/collator_interface.h"
-#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h"
-#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-using OplogBatch = ReshardingOplogBatchPreparer::OplogBatchToPrepare;
-
-class ScopedServerParameterChange {
-public:
- ScopedServerParameterChange(int* param, int newValue) : _param(param), _originalValue(*_param) {
- *param = newValue;
- }
-
- ~ScopedServerParameterChange() {
- *_param = _originalValue;
- }
-
-private:
- int* const _param;
- const int _originalValue;
-};
-
-class ReshardingOplogBatchPreparerTest : public unittest::Test {
-protected:
- repl::OplogEntry makeUpdateOp(BSONObj document) {
- return makeUpdateOp(std::move(document), boost::none, boost::none);
- }
-
- repl::OplogEntry makeUpdateOp(BSONObj document,
- boost::optional<LogicalSessionId> lsid,
- boost::optional<TxnNumber> txnNumber) {
- repl::MutableOplogEntry op;
- op.setOpType(repl::OpTypeEnum::kUpdate);
- op.setObject2(document["_id"].wrap().getOwned());
- op.setObject(std::move(document));
- op.setSessionId(std::move(lsid));
- op.setTxnNumber(std::move(txnNumber));
-
- // These are unused by ReshardingOplogBatchPreparer but required by IDL parsing.
- op.setNss({});
- op.setOpTimeAndWallTimeBase({{}, {}});
-
- return {op.toBSON()};
- }
-
- repl::OplogEntry makeCommandOp(BSONObj commandObj) {
- return makeCommandOp(std::move(commandObj), boost::none, boost::none);
- }
-
- repl::OplogEntry makeCommandOp(BSONObj commandObj,
- boost::optional<LogicalSessionId> lsid,
- boost::optional<TxnNumber> txnNumber) {
- repl::MutableOplogEntry op;
- op.setOpType(repl::OpTypeEnum::kCommand);
- op.setObject(std::move(commandObj));
- op.setSessionId(std::move(lsid));
- op.setTxnNumber(std::move(txnNumber));
-
- // These are unused by ReshardingOplogBatchPreparer but required by IDL parsing.
- op.setNss({});
- op.setOpTimeAndWallTimeBase({{}, {}});
-
- return {op.toBSON()};
- }
-
- ReshardingOplogBatchPreparer::OplogBatchToApply& getNonEmptyWriterVector(
- ReshardingOplogBatchPreparer::WriterVectors writerVectors) {
- ReshardingOplogBatchPreparer::OplogBatchToApply* nonempty = nullptr;
-
- for (auto& writer : writerVectors) {
- if (!writer.empty()) {
- ASSERT_FALSE(nonempty)
- << "Expected only one non-empty writer vector, but found multiple";
- nonempty = &writer;
- }
- }
-
- ASSERT_TRUE(nonempty) << "Expected to find a non-empty writer vector, but didn't";
- return *nonempty;
- }
-
- ReshardingOplogBatchPreparer _batchPreparer{nullptr};
-
- static constexpr size_t kNumWriterVectors = 2;
-
-private:
- ScopedServerParameterChange _numWriterVectors{&resharding::gReshardingWriterThreadCount,
- int(kNumWriterVectors)};
-};
-
-TEST_F(ReshardingOplogBatchPreparerTest, AssignsCrudOpsToWriterVectorsById) {
- OplogBatch batch;
-
- int numOps = 10;
- for (int i = 0; i < numOps; ++i) {
- batch.emplace_back(makeUpdateOp(BSON("_id" << 0 << "n" << i)));
- }
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), 0U);
-
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), numOps);
- for (int i = 0; i < numOps; ++i) {
- ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), BSON("_id" << 0 << "n" << i));
- }
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, DistributesCrudOpsToWriterVectorsFairly) {
- OplogBatch batch;
-
- int numOps = 100;
- for (int i = 0; i < numOps; ++i) {
- batch.emplace_back(makeUpdateOp(BSON("_id" << i)));
- }
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), 0U);
-
- // Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the _id
- // values will be hashed but can at least assert the writer vector sizes won't wildly differ
- // from each other.
- ASSERT_GTE(writerVectors[0].size(), numOps / 5);
- ASSERT_GTE(writerVectors[1].size(), numOps / 5);
- ASSERT_EQ(writerVectors[0].size() + writerVectors[1].size(), numOps);
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, CreatesDerivedCrudOpsForApplyOps) {
- OplogBatch batch;
-
- // We use the "fromApplyOps" field in the document to distinguish between the regular oplog
- // entries from the derived ones later on.
- int numOps = 20;
- for (int i = 0; i < numOps; ++i) {
- batch.emplace_back(makeUpdateOp(BSON("_id" << i << "fromApplyOps" << false)));
- }
-
- BSONObjBuilder applyOpsBuilder;
- {
- BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps");
- for (int i = 0; i < numOps; ++i) {
- // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to
- // deal with setting the 'o2' field.
- opsArrayBuilder.append(
- repl::DurableReplOperation(
- repl::OpTypeEnum::kInsert, {}, BSON("_id" << i << "fromApplyOps" << true))
- .toBSON());
- }
- }
-
- batch.emplace_back(makeCommandOp(applyOpsBuilder.done()));
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), numOps);
-
- ASSERT_EQ(writerVectors[0].size() + writerVectors[1].size(), numOps * 2);
-
- for (const auto& writer : writerVectors) {
- for (size_t i = 0; i < writer.size(); ++i) {
- if (writer[i]->getObject()["fromApplyOps"].Bool()) {
- continue;
- }
-
- int docId = writer[i]->getObject()["_id"].Int();
-
- bool found = false;
- for (size_t j = i + 1; j < writer.size(); ++j) {
- if (writer[j]->getObject()["fromApplyOps"].Bool() &&
- writer[j]->getObject()["_id"].Int() == docId) {
- found = true;
- }
- }
- ASSERT_TRUE(found) << "Expected to find normal op and unrolled applyOps for _id="
- << docId << " in same writer vector, but didn't";
- }
- }
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, InterleavesDerivedCrudOpsForApplyOps) {
- OplogBatch batch;
-
- int numOps = 10;
- for (int i = 0; i < numOps; ++i) {
- // We use the "fromApplyOps" field in the document to distinguish between the regular oplog
- // entries from the derived ones later on.
- if (i % 2 == 0) {
- batch.emplace_back(
- makeUpdateOp(BSON("_id" << 0 << "n" << i << "fromApplyOps" << false)));
- } else {
- // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to
- // deal with setting the 'o2' field.
- batch.emplace_back(makeCommandOp(BSON(
- "applyOps" << BSON_ARRAY(repl::DurableReplOperation(
- repl::OpTypeEnum::kInsert,
- {},
- BSON("_id" << 0 << "n" << i << "fromApplyOps" << true))
- .toBSON()))));
- }
- }
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), numOps / 2);
-
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), numOps);
- for (int i = 0; i < numOps; ++i) {
- if (i % 2 == 0) {
- ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(),
- BSON("_id" << 0 << "n" << i << "fromApplyOps" << false));
- } else {
- ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(),
- BSON("_id" << 0 << "n" << i << "fromApplyOps" << true));
- }
- }
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid) {
- OplogBatch batch;
- auto lsid = makeLogicalSessionIdForTest();
-
- int numOps = 10;
- for (int i = 0; i < numOps; ++i) {
- batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{1}));
- }
-
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
-
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), numOps);
- for (int i = 0; i < numOps; ++i) {
- ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), BSON("_id" << i));
- ASSERT_EQ(writer[i]->getSessionId(), lsid);
- ASSERT_EQ(writer[i]->getTxnNumber(), TxnNumber{1});
- ASSERT_TRUE(writer[i]->isForReshardingSessionApplication());
- }
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) {
- OplogBatch batch;
- auto lsid = makeLogicalSessionIdForTest();
-
- int numOps = 5;
- for (int i = 1; i <= numOps; ++i) {
- batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{i}));
- }
-
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
-
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_BSONOBJ_BINARY_EQ(writer[0]->getObject(), BSON("_id" << numOps));
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(writer[0]->getTxnNumber(), TxnNumber{numOps});
- ASSERT_TRUE(writer[0]->isForReshardingSessionApplication());
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFairly) {
- OplogBatch batch;
-
- int numOps = 100;
- for (int i = 0; i < numOps; ++i) {
- batch.emplace_back(
- makeUpdateOp(BSON("_id" << i), makeLogicalSessionIdForTest(), TxnNumber{1}));
- }
-
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
-
- // Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the lsid
- // values will be hashed but can at least assert the writer vector sizes won't wildly differ
- // from each other.
- ASSERT_GTE(writerVectors[0].size(), numOps / 5);
- ASSERT_GTE(writerVectors[1].size(), numOps / 5);
- ASSERT_EQ(writerVectors[0].size() + writerVectors[1].size(), numOps);
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, ThrowsForUnsupportedCommandOps) {
- {
- OplogBatch batch;
- batch.emplace_back(makeCommandOp(BSON("drop" << 1)));
-
- std::list<repl::OplogEntry> derivedOps;
- ASSERT_THROWS_CODE(_batchPreparer.makeCrudOpWriterVectors(batch, derivedOps),
- DBException,
- ErrorCodes::OplogOperationUnsupported);
- }
-
- {
- OplogBatch batch;
- batch.emplace_back(makeCommandOp(BSON("commitIndexBuild" << 1)));
-
- std::list<repl::OplogEntry> derivedOps;
- ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch),
- DBException,
- ErrorCodes::OplogOperationUnsupported);
- }
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) {
- OplogBatch batch;
-
- int numOps = 5;
- for (int i = 0; i < numOps; ++i) {
- repl::MutableOplogEntry op;
- op.setOpType(repl::OpTypeEnum::kNoop);
- op.setObject({});
- op.setNss({});
- op.setOpTimeAndWallTimeBase({{}, {}});
- batch.emplace_back(op.toBSON());
- }
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writerVectors[0].size(), 0U);
- ASSERT_EQ(writerVectors[1].size(), 0U);
-
- writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(writerVectors[0].size(), 0U);
- ASSERT_EQ(writerVectors[1].size(), 0U);
-}
-
-} // namespace
-} // namespace mongo