diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-05-06 23:19:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-07 00:01:40 +0000 |
commit | 9c4a555660fd6d7c8ec36b1c8a1a6eb60fe9149a (patch) | |
tree | 152cc08a2c536e1a1260fe10fd361280f8cb091e /src | |
parent | 35997dbc6b0419c71f054894fe846f03d490eddd (diff) | |
download | mongo-9c4a555660fd6d7c8ec36b1c8a1a6eb60fe9149a.tar.gz |
SERVER-56338 Make resharding CRUD and session application concurrent.
Diffstat (limited to 'src')
10 files changed, 66 insertions, 74 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 1a06226cb38..0b86a8708ec 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -516,10 +516,6 @@ BSONObj OplogEntry::toBSONForLogging() const { builder.append("isForCappedCollection", _isForCappedCollection); } - if (_isForReshardingSessionApplication) { - builder.append("isForReshardingSessionApplication", _isForReshardingSessionApplication); - } - if (_preImageOp) { auto op = _preImageOp->toBSON(); if (estimatedTotalSize + op.objsize() > sizeTooBig) { @@ -577,14 +573,6 @@ void OplogEntry::setPostImageOp(const BSONObj& postImageOp) { uassertStatusOK(DurableOplogEntry::parse(postImageOp)))); } -bool OplogEntry::isForReshardingSessionApplication() const { - return _isForReshardingSessionApplication; -} - -void OplogEntry::setIsForReshardingSessionApplication(bool isForReshardingSessionApplication) { - _isForReshardingSessionApplication = isForReshardingSessionApplication; -} - const boost::optional<mongo::Value>& OplogEntry::get_id() const& { return _entry.get_id(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index b7e24dc16a1..ead93fea939 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -511,9 +511,6 @@ public: void setPostImageOp(std::shared_ptr<DurableOplogEntry> postImageOp); void setPostImageOp(const BSONObj& postImageOp); - bool isForReshardingSessionApplication() const; - void setIsForReshardingSessionApplication(bool isForReshardingSessionApplication = true); - std::string toStringForLogging() const; /** @@ -574,8 +571,6 @@ private: std::shared_ptr<DurableOplogEntry> _postImageOp; bool _isForCappedCollection = false; - - bool _isForReshardingSessionApplication = false; }; std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 73f590c7d7d..48e77505265 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -68,25 +68,36 @@ ReshardingOplogApplier::ReshardingOplogApplier( SemiFuture<void> ReshardingOplogApplier::_applyBatch( std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken, - CancelableOperationContextFactory factory, - bool isForSessionApplication) { - auto currentWriterVectors = [&] { - if (isForSessionApplication) { - return _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply); - } else { - return _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps); - } - }(); + CancelableOperationContextFactory factory) { + auto crudWriterVectors = + _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps); CancellationSource errorSource(cancelToken); std::vector<SharedSemiFuture<void>> batchApplierFutures; - batchApplierFutures.reserve(currentWriterVectors.size()); + // Use `2 * crudWriterVectors.size()` because sessionWriterVectors.size() is very likely equal + // to crudWriterVectors.size(). Calling ReshardingOplogBatchApplier::applyBatch<false>() first + // though allows CRUD application to be concurrent with preparing the writer vectors for session + // application in addition to being concurrent with session application itself. + batchApplierFutures.reserve(2 * crudWriterVectors.size()); - for (auto&& writer : currentWriterVectors) { + for (auto&& writer : crudWriterVectors) { if (!writer.empty()) { batchApplierFutures.emplace_back( - _batchApplier.applyBatch(std::move(writer), executor, errorSource.token(), factory) + _batchApplier + .applyBatch<false>(std::move(writer), executor, errorSource.token(), factory) + .share()); + } + } + + auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply); + batchApplierFutures.reserve(crudWriterVectors.size() + sessionWriterVectors.size()); + + for (auto&& writer : sessionWriterVectors) { + if (!writer.empty()) { + batchApplierFutures.emplace_back( + _batchApplier + .applyBatch<true>(std::move(writer), executor, errorSource.token(), factory) .share()); } } @@ -110,12 +121,7 @@ SemiFuture<void> ReshardingOplogApplier::run(std::shared_ptr<executor::TaskExecu LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size()); _currentBatchToApply = std::move(batch); - return _applyBatch( - executor, cancelToken, factory, false /* isForSessionApplication */); - }) - .then([this, executor, cancelToken, factory] { - return _applyBatch( - executor, cancelToken, factory, true /* isForSessionApplication */); + return _applyBatch(executor, cancelToken, factory); }) .then([this, factory] { if (_currentBatchToApply.empty()) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index 55f451eac97..933fb7cc317 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -116,8 +116,7 @@ private: */ SemiFuture<void> _applyBatch(std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken, - CancelableOperationContextFactory factory, - bool isForSessionApplication); + CancelableOperationContextFactory factory); /** * Records the progress made by this applier to storage. diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp index 0346af8ec79..5f5e00e935c 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp @@ -47,6 +47,7 @@ ReshardingOplogBatchApplier::ReshardingOplogBatchApplier( const ReshardingOplogSessionApplication& sessionApplication) : _crudApplication(crudApplication), _sessionApplication(sessionApplication) {} +template <bool IsForSessionApplication> SemiFuture<void> ReshardingOplogBatchApplier::applyBatch( OplogBatch batch, std::shared_ptr<executor::TaskExecutor> executor, @@ -60,27 +61,29 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch( auto chainCtx = std::make_shared<ChainContext>(); chainCtx->batch = std::move(batch); - return resharding::WithAutomaticRetry([this, chainCtx, cancelToken, factory] { - // Writing `auto& i = chainCtx->nextToApply` takes care of incrementing - // chainCtx->nextToApply on each loop iteration. - for (auto& i = chainCtx->nextToApply; i < chainCtx->batch.size(); ++i) { - const auto& oplogEntry = *chainCtx->batch[i]; - auto opCtx = factory.makeOperationContext(&cc()); + return resharding::WithAutomaticRetry<unique_function<SemiFuture<void>()>>( + [this, chainCtx, cancelToken, factory] { + // Writing `auto& i = chainCtx->nextToApply` takes care of incrementing + // chainCtx->nextToApply on each loop iteration. + for (auto& i = chainCtx->nextToApply; i < chainCtx->batch.size(); ++i) { + const auto& oplogEntry = *chainCtx->batch[i]; + auto opCtx = factory.makeOperationContext(&cc()); - if (oplogEntry.isForReshardingSessionApplication()) { - auto hitPreparedTxn = - _sessionApplication.tryApplyOperation(opCtx.get(), oplogEntry); + if constexpr (IsForSessionApplication) { + auto hitPreparedTxn = + _sessionApplication.tryApplyOperation(opCtx.get(), oplogEntry); - if (hitPreparedTxn) { - return future_util::withCancellation(std::move(*hitPreparedTxn), - cancelToken); + if (hitPreparedTxn) { + return future_util::withCancellation(std::move(*hitPreparedTxn), + cancelToken); + } + } else { + uassertStatusOK( + _crudApplication.applyOperation(opCtx.get(), oplogEntry)); } - } else { - uassertStatusOK(_crudApplication.applyOperation(opCtx.get(), oplogEntry)); } - } - return makeReadyFutureWith([] {}).semi(); - }) + return makeReadyFutureWith([] {}).semi(); + }) .onTransientError([](const Status& status) { LOGV2(5615800, "Transient error while applying oplog entry from donor shard", @@ -104,4 +107,16 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch( .semi(); } +template SemiFuture<void> ReshardingOplogBatchApplier::applyBatch<false>( + OplogBatch batch, + std::shared_ptr<executor::TaskExecutor> executor, + CancellationToken cancelToken, + CancelableOperationContextFactory factory) const; + +template SemiFuture<void> ReshardingOplogBatchApplier::applyBatch<true>( + OplogBatch batch, + std::shared_ptr<executor::TaskExecutor> executor, + CancellationToken cancelToken, + CancelableOperationContextFactory factory) const; + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h index d77e1bb74cb..c5c7681c249 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.h @@ -52,6 +52,7 @@ public: ReshardingOplogBatchApplier(const ReshardingOplogApplicationRules& crudApplication, const ReshardingOplogSessionApplication& sessionApplication); + template <bool IsForSessionApplication> SemiFuture<void> applyBatch(OplogBatch batch, std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken, diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index 1b48da004fe..417139aada9 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -343,12 +343,11 @@ TEST_F(ReshardingOplogBatchApplierTest, WaitsOnPreparedTxnAndAutomaticallyRetrie TxnNumber incomingTxnNumber = existingTxnNumber + 1; auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber); - oplogEntry.setIsForReshardingSessionApplication(true); auto executor = makeTaskExecutorForApplier(); auto factory = makeCancelableOpCtxForApplier(CancellationToken::uncancelable()); - auto future = - applier()->applyBatch({&oplogEntry}, executor, CancellationToken::uncancelable(), factory); + auto future = applier()->applyBatch<true>( + {&oplogEntry}, executor, CancellationToken::uncancelable(), factory); ASSERT_FALSE(future.isReady()); // Wait a little bit to increase the likelihood that the applier has blocked on the prepared @@ -395,12 +394,12 @@ TEST_F(ReshardingOplogBatchApplierTest, CancelableWhileWaitingOnPreparedTxn) { TxnNumber incomingTxnNumber = existingTxnNumber + 1; auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber); - oplogEntry.setIsForReshardingSessionApplication(true); CancellationSource cancelSource; auto executor = makeTaskExecutorForApplier(); auto factory = makeCancelableOpCtxForApplier(CancellationToken::uncancelable()); - auto future = applier()->applyBatch({&oplogEntry}, executor, cancelSource.token(), factory); + auto future = + applier()->applyBatch<true>({&oplogEntry}, executor, cancelSource.token(), factory); ASSERT_FALSE(future.isReady()); // Wait a little bit to increase the likelihood that the applier has blocked on the prepared diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp index 122adece62f..a6f73b01cec 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp @@ -106,7 +106,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( auto writerVectors = _makeEmptyWriterVectors(); for (const auto& op : batch) { - invariant(!op.isForReshardingSessionApplication()); if (op.isCrudOpType()) { _appendCrudOpToWriterVector(&op, writerVectors); } else if (op.isCommand()) { @@ -149,17 +148,17 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( } WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( - OplogBatchToPrepare& batch) const { + const OplogBatchToPrepare& batch) const { auto writerVectors = _makeEmptyWriterVectors(); struct SessionOpsList { TxnNumber txnNum = kUninitializedTxnNumber; - std::vector<OplogEntry*> ops; + std::vector<const OplogEntry*> ops; }; LogicalSessionIdMap<SessionOpsList> sessionTracker; - auto updateSessionTracker = [&](OplogEntry* op) { + auto updateSessionTracker = [&](const OplogEntry* op) { if (const auto& lsid = op->getSessionId()) { uassert(4990700, str::stream() << "Missing txnNumber for oplog entry with lsid: " @@ -199,7 +198,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( for (auto& [lsid, opList] : sessionTracker) { for (auto& op : opList.ops) { - op->setIsForReshardingSessionApplication(); _appendSessionOpToWriterVector(lsid, op, writerVectors); } } diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h index 33faa290643..f4f035ccdbd 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h @@ -85,12 +85,8 @@ public: * The returned writer vectors refer to memory owned by `batch`. The caller must take care to * ensure `batch` outlives the writer vectors all being applied and must take care not to modify * `batch` until after the writer vectors have all been applied. - * - * As a performance optimization, to avoid creating a separate copy of `batch`, this function - * mutates the contained oplog entries. The caller should take care to apply the writer vectors - * from `makeCrudOpWriterVectors(batch)` first. */ - WriterVectors makeSessionOpWriterVectors(OplogBatchToPrepare& batch) const; + WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch) const; static void throwIfUnsupportedCommandOp(const OplogEntry& op); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp index b7bccf2fb8a..91570c271a1 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp @@ -299,7 +299,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid) ASSERT_BSONOBJ_BINARY_EQ(writer[i]->getObject(), BSON("_id" << i)); ASSERT_EQ(writer[i]->getSessionId(), lsid); ASSERT_EQ(writer[i]->getTxnNumber(), TxnNumber{1}); - ASSERT_TRUE(writer[i]->isForReshardingSessionApplication()); } } @@ -320,7 +319,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) { ASSERT_BSONOBJ_BINARY_EQ(writer[0]->getObject(), BSON("_id" << numOps)); ASSERT_EQ(writer[0]->getSessionId(), lsid); ASSERT_EQ(writer[0]->getTxnNumber(), TxnNumber{numOps}); - ASSERT_TRUE(writer[0]->isForReshardingSessionApplication()); } TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFairly) { @@ -416,7 +414,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTx ASSERT_EQ(writer.size(), 1U); ASSERT_EQ(writer[0]->getSessionId(), lsid); ASSERT_EQ(*writer[0]->getTxnNumber(), 2); - ASSERT_TRUE(writer[0]->isForReshardingSessionApplication()); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) { @@ -433,7 +430,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) { ASSERT_EQ(writer.size(), 1U); ASSERT_EQ(writer[0]->getSessionId(), lsid); ASSERT_EQ(*writer[0]->getTxnNumber(), 2); - ASSERT_TRUE(writer[0]->isForReshardingSessionApplication()); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) { @@ -449,7 +445,6 @@ TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTx ASSERT_EQ(writer.size(), 1U); ASSERT_EQ(writer[0]->getSessionId(), lsid); ASSERT_EQ(*writer[0]->getTxnNumber(), 2); - ASSERT_TRUE(writer[0]->isForReshardingSessionApplication()); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForPartialUnpreparedTxn) { |