diff options
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r-- | src/mongo/db/ops/update_request.h | 44 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.h | 22 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability_test.cpp | 2 |
4 files changed, 71 insertions, 32 deletions
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index a77945e601a..43060673b82 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -43,6 +43,20 @@ namespace mongo { namespace { const std::vector<BSONObj> emptyArrayFilters{}; const BSONObj emptyCollation{}; + +template <typename T> +void appendArrayToString(const T& arr, StringBuilder* builder) { + bool first = true; + *builder << "["; + for (const auto& elem : arr) { + if (!first) { + *builder << ", "; + } + first = false; + *builder << elem; + } + *builder << "]"; +} } // namespace class FieldRef; @@ -235,12 +249,12 @@ public: return _yieldPolicy; } - void setStmtId(StmtId stmtId) { - _stmtId = std::move(stmtId); + void setStmtIds(std::vector<StmtId> stmtIds) { + _stmtIds = std::move(stmtIds); } - StmtId getStmtId() const { - return _stmtId; + const std::vector<StmtId>& getStmtIds() const { + return _stmtIds; } const std::string toString() const { @@ -250,18 +264,12 @@ public: builder << " sort: " << _sort; builder << " collation: " << getCollation(); builder << " updateModification: " << getUpdateModification().toString(); - builder << " stmtId: " << _stmtId; - - builder << " arrayFilters: ["; - bool first = true; - for (auto arrayFilter : getArrayFilters()) { - if (!first) { - builder << ", "; - } - first = false; - builder << arrayFilter; - } - builder << "]"; + + builder << " stmtIds: "; + appendArrayToString(getStmtIds(), &builder); + + builder << " arrayFilters: "; + appendArrayToString(getArrayFilters(), &builder); if (getUpdateConstants()) { builder << " updateConstants: " << *getUpdateConstants(); @@ -302,8 +310,8 @@ private: // by the user for each individual element of the 'updates' array in the 'update' command. boost::optional<BSONObj> _letParameters; - // The statement id of this request. - StmtId _stmtId = kUninitializedStmtId; + // The statement ids of this request. + std::vector<StmtId> _stmtIds = {kUninitializedStmtId}; // Flags controlling the update. diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 7da347238a0..e6738ae32ce 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -540,7 +540,7 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() { WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp, - bool fromMigrate) { + const InsertType& type) { // Insert performs its own retries, so we should only be within a WriteUnitOfWork when run in a // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); @@ -609,15 +609,22 @@ WriteResult performInserts(OperationContext* opCtx, // current batch to preserve the error results order. } else { BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); - batch.emplace_back(stmtId, toInsert); + + // A time-series insert can combine multiple writes into a single operation, and thus + // can have multiple statement ids associated with it if it is retryable. + batch.emplace_back(type == InsertType::kTimeseries && wholeOp.getStmtIds() + ? *wholeOp.getStmtIds() + : std::vector<StmtId>{stmtId}, + toInsert); + bytesInBatch += batch.back().doc.objsize(); if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < maxBatchBytes) continue; // Add more to batch before inserting. } - bool canContinue = - insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out, fromMigrate); + bool canContinue = insertBatchAndHandleErrors( + opCtx, wholeOp, batch, &lastOpFixer, &out, type == InsertType::kFromMigrate); batch.clear(); // We won't need the current batch any more. bytesInBatch = 0; @@ -660,7 +667,6 @@ WriteResult performInserts(OperationContext* opCtx, static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, const NamespaceString& ns, - StmtId stmtId, const UpdateRequest& updateRequest) { const ExtensionsCallbackReal extensionsCallback(opCtx, &updateRequest.getNamespaceString()); ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback); @@ -760,7 +766,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( OperationContext* opCtx, const NamespaceString& ns, - StmtId stmtId, + const std::vector<StmtId>& stmtIds, const write_ops::UpdateOpEntry& op, LegacyRuntimeConstants runtimeConstants, const boost::optional<BSONObj>& letParams) { @@ -786,7 +792,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( if (letParams) { request.setLetParameters(std::move(letParams)); } - request.setStmtId(stmtId); + request.setStmtIds(stmtIds); request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY : PlanYieldPolicy::YieldPolicy::YIELD_AUTO); @@ -796,7 +802,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( ++numAttempts; try { - return performSingleUpdateOp(opCtx, ns, stmtId, request); + return performSingleUpdateOp(opCtx, ns, request); } catch (ExceptionFor<ErrorCodes::DuplicateKey>& ex) { const ExtensionsCallbackReal extensionsCallback(opCtx, &request.getNamespaceString()); ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback); @@ -823,7 +829,9 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry( MONGO_UNREACHABLE; } -WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& wholeOp) { +WriteResult performUpdates(OperationContext* opCtx, + const write_ops::Update& wholeOp, + const UpdateType& type) { // Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); @@ -872,9 +880,16 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); }); try { lastOpFixer.startingOp(); + + // A time-series insert can combine multiple writes into a single operation, and thus + // can have multiple statement ids associated with it if it is retryable. + auto stmtIds = type == UpdateType::kTimeseries && wholeOp.getStmtIds() + ? *wholeOp.getStmtIds() + : std::vector<StmtId>{stmtId}; + out.results.emplace_back(performSingleUpdateOpWithDupKeyRetry(opCtx, wholeOp.getNamespace(), - stmtId, + stmtIds, singleOp, runtimeConstants, wholeOp.getLet())); diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h index b50c45dd168..0b2809251cf 100644 --- a/src/mongo/db/ops/write_ops_exec.h +++ b/src/mongo/db/ops/write_ops_exec.h @@ -56,6 +56,19 @@ struct WriteResult { std::vector<StatusWith<SingleWriteResult>> results; }; +/** + * Enums used to differentiate between types of insert/update operations based on how they were + * issued. + */ +enum class InsertType { + kStandard, + kFromMigrate, // From a chunk migration. + kTimeseries, +}; +enum class UpdateType { + kStandard, + kTimeseries, +}; /** * Performs a batch of inserts, updates, or deletes. @@ -68,15 +81,18 @@ struct WriteResult { * exception being thrown from these functions. Callers are responsible for managing LastError in * that case. This should generally be combined with LastError handling from parse failures. * - * 'fromMigrate' indicates whether the operation was induced by a chunk migration + * 'type' indicates whether the operation was induced by a standard write, a chunk migration, or a + * time-series insert. * * Note: performInserts() gets called for both user and internal (like tenant collection cloner, * and initial sync/tenant migration oplog buffer) inserts. */ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& op, - bool fromMigrate = false); -WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op); + const InsertType& type = InsertType::kStandard); +WriteResult performUpdates(OperationContext* opCtx, + const write_ops::Update& op, + const UpdateType& type = UpdateType::kStandard); WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op); /** diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index 0f0b2240272..4aa7f3a4748 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -79,7 +79,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, {}, // sessionInfo boost::none, // upsert Date_t(), // wall clock time - boost::none, // statement id + {}, // statement ids boost::none, // optime of previous write within same transaction preImageOpTime, // pre-image optime postImageOpTime, // post-image optime |