summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-05-06 23:19:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-07 00:01:40 +0000
commit9c4a555660fd6d7c8ec36b1c8a1a6eb60fe9149a (patch)
tree152cc08a2c536e1a1260fe10fd361280f8cb091e /src
parent35997dbc6b0419c71f054894fe846f03d490eddd (diff)
downloadmongo-9c4a555660fd6d7c8ec36b1c8a1a6eb60fe9149a.tar.gz
SERVER-56338 Make resharding CRUD and session application concurrent.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp12
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp42
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp49
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier.h1
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp9
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp5
10 files changed, 66 insertions, 74 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 1a06226cb38..0b86a8708ec 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -516,10 +516,6 @@ BSONObj OplogEntry::toBSONForLogging() const {
builder.append("isForCappedCollection", _isForCappedCollection);
}
- if (_isForReshardingSessionApplication) {
- builder.append("isForReshardingSessionApplication", _isForReshardingSessionApplication);
- }
-
if (_preImageOp) {
auto op = _preImageOp->toBSON();
if (estimatedTotalSize + op.objsize() > sizeTooBig) {
@@ -577,14 +573,6 @@ void OplogEntry::setPostImageOp(const BSONObj& postImageOp) {
uassertStatusOK(DurableOplogEntry::parse(postImageOp))));
}
-bool OplogEntry::isForReshardingSessionApplication() const {
- return _isForReshardingSessionApplication;
-}
-
-void OplogEntry::setIsForReshardingSessionApplication(bool isForReshardingSessionApplication) {
- _isForReshardingSessionApplication = isForReshardingSessionApplication;
-}
-
const boost::optional<mongo::Value>& OplogEntry::get_id() const& {
return _entry.get_id();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index b7e24dc16a1..ead93fea939 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -511,9 +511,6 @@ public:
void setPostImageOp(std::shared_ptr<DurableOplogEntry> postImageOp);
void setPostImageOp(const BSONObj& postImageOp);
- bool isForReshardingSessionApplication() const;
- void setIsForReshardingSessionApplication(bool isForReshardingSessionApplication = true);
-
std::string toStringForLogging() const;
/**
@@ -574,8 +571,6 @@ private:
std::shared_ptr<DurableOplogEntry> _postImageOp;
bool _isForCappedCollection = false;
-
- bool _isForReshardingSessionApplication = false;
};
std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 73f590c7d7d..48e77505265 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -68,25 +68,36 @@ ReshardingOplogApplier::ReshardingOplogApplier(
SemiFuture<void> ReshardingOplogApplier::_applyBatch(
std::shared_ptr<executor::TaskExecutor> executor,
CancellationToken cancelToken,
- CancelableOperationContextFactory factory,
- bool isForSessionApplication) {
- auto currentWriterVectors = [&] {
- if (isForSessionApplication) {
- return _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply);
- } else {
- return _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps);
- }
- }();
+ CancelableOperationContextFactory factory) {
+ auto crudWriterVectors =
+ _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps);
CancellationSource errorSource(cancelToken);
std::vector<SharedSemiFuture<void>> batchApplierFutures;
- batchApplierFutures.reserve(currentWriterVectors.size());
+ // Use `2 * crudWriterVectors.size()` because sessionWriterVectors.size() is very likely equal
+ // to crudWriterVectors.size(). Calling ReshardingOplogBatchApplier::applyBatch<false>() first
+ // though allows CRUD application to be concurrent with preparing the writer vectors for session
+ // application in addition to being concurrent with session application itself.
+ batchApplierFutures.reserve(2 * crudWriterVectors.size());
- for (auto&& writer : currentWriterVectors) {
+ for (auto&& writer : crudWriterVectors) {
if (!writer.empty()) {
batchApplierFutures.emplace_back(
- _batchApplier.applyBatch(std::move(writer), executor, errorSource.token(), factory)
+ _batchApplier
+ .applyBatch<false>(std::move(writer), executor, errorSource.token(), factory)
+ .share());
+ }
+ }
+
+ auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply);
+ batchApplierFutures.reserve(crudWriterVectors.size() + sessionWriterVectors.size());
+
+ for (auto&& writer : sessionWriterVectors) {
+ if (!writer.empty()) {
+ batchApplierFutures.emplace_back(
+ _batchApplier
+ .applyBatch<true>(std::move(writer), executor, errorSource.token(), factory)
.share());
}
}
@@ -110,12 +121,7 @@ SemiFuture<void> ReshardingOplogApplier::run(std::shared_ptr<executor::TaskExecu
LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size());
_currentBatchToApply = std::move(batch);
- return _applyBatch(
- executor, cancelToken, factory, false /* isForSessionApplication */);
- })
- .then([this, executor, cancelToken, factory] {
- return _applyBatch(
- executor, cancelToken, factory, true /* isForSessionApplication */);
+ return _applyBatch(executor, cancelToken, factory);
})
.then([this, factory] {
if (_currentBatchToApply.empty()) {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index 55f451eac97..933fb7cc317 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -116,8 +116,7 @@ private:
*/
SemiFuture<void> _applyBatch(std::shared_ptr<executor::TaskExecutor> executor,
CancellationToken cancelToken,
- CancelableOperationContextFactory factory,
- bool isForSessionApplication);
+ CancelableOperationContextFactory factory);
/**
* Records the progress made by this applier to storage.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
index 0346af8ec79..5f5e00e935c 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
@@ -47,6 +47,7 @@ ReshardingOplogBatchApplier::ReshardingOplogBatchApplier(
const ReshardingOplogSessionApplication& sessionApplication)
: _crudApplication(crudApplication), _sessionApplication(sessionApplication) {}
+template <bool IsForSessionApplication>
SemiFuture<void> ReshardingOplogBatchApplier::applyBatch(
OplogBatch batch,
std::shared_ptr<executor::TaskExecutor> executor,
@@ -60,27 +61,29 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch(
auto chainCtx = std::make_shared<ChainContext>();
chainCtx->batch = std::move(batch);
- return resharding::WithAutomaticRetry([this, chainCtx, cancelToken, factory] {
- // Writing `auto& i = chainCtx->nextToApply` takes care of incrementing
- // chainCtx->nextToApply on each loop iteration.
- for (auto& i = chainCtx->nextToApply; i < chainCtx->batch.size(); ++i) {
- const auto& oplogEntry = *chainCtx->batch[i];
- auto opCtx = factory.makeOperationContext(&cc());
+ return resharding::WithAutomaticRetry<unique_function<SemiFuture<void>()>>(
+ [this, chainCtx, cancelToken, factory] {
+ // Writing `auto& i = chainCtx->nextToApply` takes care of incrementing
+ // chainCtx->nextToApply on each loop iteration.
+ for (auto& i = chainCtx->nextToApply; i < chainCtx->batch.size(); ++i) {
+ const auto& oplogEntry = *chainCtx->batch[i];
+ auto opCtx = factory.makeOperationContext(&cc());
- if (oplogEntry.isForReshardingSessionApplication()) {
- auto hitPreparedTxn =
- _sessionApplication.tryApplyOperation(opCtx.get(), oplogEntry);
+ if constexpr (IsForSessionApplication) {
+ auto hitPreparedTxn =
+ _sessionApplication.tryApplyOperation(opCtx.get(), oplogEntry);
- if (hitPreparedTxn) {
- return future_util::withCancellation(std::move(*hitPreparedTxn),
- cancelToken);
+ if (hitPreparedTxn) {
+ return future_util::withCancellation(std::move(*hitPreparedTxn),
+ cancelToken);
+ }
+ } else {
+ uassertStatusOK(
+ _crudApplication.applyOperation(opCtx.get(), oplogEntry));
}
- } else {
- uassertStatusOK(_crudApplication.applyOperation(opCtx.get(), oplogEntry));
}
- }
- return makeReadyFutureWith([] {}).semi();
- })
+ return makeReadyFutureWith([] {}).semi();
+ })
.onTransientError([](const Status& status) {
LOGV2(5615800,
"Transient error while applying oplog entry from donor shard",
@@ -104,4 +107,16 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch(
.semi();
}
+template SemiFuture<void> ReshardingOplogBatchApplier::applyBatch<false>(
+ OplogBatch batch,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) const;
+
+template SemiFuture<void> ReshardingOplogBatchApplier::applyBatch<true>(
+ OplogBatch batch,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) const;
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h
index d77e1bb74cb..c5c7681c249 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h
@@ -52,6 +52,7 @@ public:
ReshardingOplogBatchApplier(const ReshardingOplogApplicationRules& crudApplication,
const ReshardingOplogSessionApplication& sessionApplication);
+ template <bool IsForSessionApplication>
SemiFuture<void> applyBatch(OplogBatch batch,
std::shared_ptr<executor::TaskExecutor> executor,
CancellationToken cancelToken,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index 1b48da004fe..417139aada9 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -343,12 +343,11 @@ TEST_F(ReshardingOplogBatchApplierTest, WaitsOnPreparedTxnAndAutomaticallyRetrie
TxnNumber incomingTxnNumber = existingTxnNumber + 1;
auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber);
- oplogEntry.setIsForReshardingSessionApplication(true);
auto executor = makeTaskExecutorForApplier();
auto factory = makeCancelableOpCtxForApplier(CancellationToken::uncancelable());
- auto future =
- applier()->applyBatch({&oplogEntry}, executor, CancellationToken::uncancelable(), factory);
+ auto future = applier()->applyBatch<true>(
+ {&oplogEntry}, executor, CancellationToken::uncancelable(), factory);
ASSERT_FALSE(future.isReady());
// Wait a little bit to increase the likelihood that the applier has blocked on the prepared
@@ -395,12 +394,12 @@ TEST_F(ReshardingOplogBatchApplierTest, CancelableWhileWaitingOnPreparedTxn) {
TxnNumber incomingTxnNumber = existingTxnNumber + 1;
auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber);
- oplogEntry.setIsForReshardingSessionApplication(true);
CancellationSource cancelSource;
auto executor = makeTaskExecutorForApplier();
auto factory = makeCancelableOpCtxForApplier(CancellationToken::uncancelable());
- auto future = applier()->applyBatch({&oplogEntry}, executor, cancelSource.token(), factory);
+ auto future =
+ applier()->applyBatch<true>({&oplogEntry}, executor, cancelSource.token(), factory);
ASSERT_FALSE(future.isReady());
// Wait a little bit to increase the likelihood that the applier has blocked on the prepared
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
index 122adece62f..a6f73b01cec 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
@@ -106,7 +106,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
auto writerVectors = _makeEmptyWriterVectors();
for (const auto& op : batch) {
- invariant(!op.isForReshardingSessionApplication());
if (op.isCrudOpType()) {
_appendCrudOpToWriterVector(&op, writerVectors);
} else if (op.isCommand()) {
@@ -149,17 +148,17 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
}
WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
- OplogBatchToPrepare& batch) const {
+ const OplogBatchToPrepare& batch) const {
auto writerVectors = _makeEmptyWriterVectors();
struct SessionOpsList {
TxnNumber txnNum = kUninitializedTxnNumber;
- std::vector<OplogEntry*> ops;
+ std::vector<const OplogEntry*> ops;
};
LogicalSessionIdMap<SessionOpsList> sessionTracker;
- auto updateSessionTracker = [&](OplogEntry* op) {
+ auto updateSessionTracker = [&](const OplogEntry* op) {
if (const auto& lsid = op->getSessionId()) {
uassert(4990700,
str::stream() << "Missing txnNumber for oplog entry with lsid: "
@@ -199,7 +198,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
for (auto& [lsid, opList] : sessionTracker) {
for (auto& op : opList.ops) {
- op->setIsForReshardingSessionApplication();
_appendSessionOpToWriterVector(lsid, op, writerVectors);
}
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
index 33faa290643..f4f035ccdbd 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
@@ -85,12 +85,8 @@ public:
* The returned writer vectors refer to memory owned by `batch`. The caller must take care to
* ensure `batch` outlives the writer vectors all being applied and must take care not to modify
* `batch` until after the writer vectors have all been applied.
- *
- * As a performance optimization, to avoid creating a separate copy of `batch`, this function
- * mutates the contained oplog entries. The caller should take care to apply the writer vectors
- * from `makeCrudOpWriterVectors(batch)` first.
*/
- WriterVectors makeSessionOpWriterVectors(OplogBatchToPrepare& batch) const;
+ WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch) const;
static void throwIfUnsupportedCommandOp(const OplogEntry& op);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
index b7bccf2fb8a..91570c271a1 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
@@ -299,7 +299,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid)
ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), BSON("_id" << i));
ASSERT_EQ(writer[i]->getSessionId(), lsid);
ASSERT_EQ(writer[i]->getTxnNumber(), TxnNumber{1});
- ASSERT_TRUE(writer[i]->isForReshardingSessionApplication());
}
}
@@ -320,7 +319,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) {
ASSERT_BSONOBJ_BINARY_EQ(writer[0]->getObject(), BSON("_id" << numOps));
ASSERT_EQ(writer[0]->getSessionId(), lsid);
ASSERT_EQ(writer[0]->getTxnNumber(), TxnNumber{numOps});
- ASSERT_TRUE(writer[0]->isForReshardingSessionApplication());
}
TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFairly) {
@@ -416,7 +414,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTx
ASSERT_EQ(writer.size(), 1U);
ASSERT_EQ(writer[0]->getSessionId(), lsid);
ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
- ASSERT_TRUE(writer[0]->isForReshardingSessionApplication());
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) {
@@ -433,7 +430,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) {
ASSERT_EQ(writer.size(), 1U);
ASSERT_EQ(writer[0]->getSessionId(), lsid);
ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
- ASSERT_TRUE(writer[0]->isForReshardingSessionApplication());
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) {
@@ -449,7 +445,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTx
ASSERT_EQ(writer.size(), 1U);
ASSERT_EQ(writer[0]->getSessionId(), lsid);
ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
- ASSERT_TRUE(writer[0]->isForReshardingSessionApplication());
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForPartialUnpreparedTxn) {