summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2021-03-16 15:26:22 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-16 20:02:29 +0000
commit133d07b3e9e8dfb2aacb52dc2174bee38d791aec (patch)
treeb9534e2e7722ae853d1bac90ad82b2c11104e0c1 /src/mongo/db/ops
parentf7a7901fc4b02e843cb23285ed28d08f14097ca7 (diff)
downloadmongo-133d07b3e9e8dfb2aacb52dc2174bee38d791aec.tar.gz
SERVER-55194 Allow multiple statement ids per oplog entry
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r--src/mongo/db/ops/update_request.h44
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp35
-rw-r--r--src/mongo/db/ops/write_ops_exec.h22
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp2
4 files changed, 71 insertions, 32 deletions
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h
index a77945e601a..43060673b82 100644
--- a/src/mongo/db/ops/update_request.h
+++ b/src/mongo/db/ops/update_request.h
@@ -43,6 +43,20 @@ namespace mongo {
namespace {
const std::vector<BSONObj> emptyArrayFilters{};
const BSONObj emptyCollation{};
+
+template <typename T>
+void appendArrayToString(const T& arr, StringBuilder* builder) {
+ bool first = true;
+ *builder << "[";
+ for (const auto& elem : arr) {
+ if (!first) {
+ *builder << ", ";
+ }
+ first = false;
+ *builder << elem;
+ }
+ *builder << "]";
+}
} // namespace
class FieldRef;
@@ -235,12 +249,12 @@ public:
return _yieldPolicy;
}
- void setStmtId(StmtId stmtId) {
- _stmtId = std::move(stmtId);
+ void setStmtIds(std::vector<StmtId> stmtIds) {
+ _stmtIds = std::move(stmtIds);
}
- StmtId getStmtId() const {
- return _stmtId;
+ const std::vector<StmtId>& getStmtIds() const {
+ return _stmtIds;
}
const std::string toString() const {
@@ -250,18 +264,12 @@ public:
builder << " sort: " << _sort;
builder << " collation: " << getCollation();
builder << " updateModification: " << getUpdateModification().toString();
- builder << " stmtId: " << _stmtId;
-
- builder << " arrayFilters: [";
- bool first = true;
- for (auto arrayFilter : getArrayFilters()) {
- if (!first) {
- builder << ", ";
- }
- first = false;
- builder << arrayFilter;
- }
- builder << "]";
+
+ builder << " stmtIds: ";
+ appendArrayToString(getStmtIds(), &builder);
+
+ builder << " arrayFilters: ";
+ appendArrayToString(getArrayFilters(), &builder);
if (getUpdateConstants()) {
builder << " updateConstants: " << *getUpdateConstants();
@@ -302,8 +310,8 @@ private:
// by the user for each individual element of the 'updates' array in the 'update' command.
boost::optional<BSONObj> _letParameters;
- // The statement id of this request.
- StmtId _stmtId = kUninitializedStmtId;
+ // The statement ids of this request.
+ std::vector<StmtId> _stmtIds = {kUninitializedStmtId};
// Flags controlling the update.
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 7da347238a0..e6738ae32ce 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -540,7 +540,7 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() {
WriteResult performInserts(OperationContext* opCtx,
const write_ops::Insert& wholeOp,
- bool fromMigrate) {
+ const InsertType& type) {
// Insert performs its own retries, so we should only be within a WriteUnitOfWork when run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -609,15 +609,22 @@ WriteResult performInserts(OperationContext* opCtx,
// current batch to preserve the error results order.
} else {
BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
- batch.emplace_back(stmtId, toInsert);
+
+ // A time-series insert can combine multiple writes into a single operation, and thus
+ // can have multiple statement ids associated with it if it is retryable.
+ batch.emplace_back(type == InsertType::kTimeseries && wholeOp.getStmtIds()
+ ? *wholeOp.getStmtIds()
+ : std::vector<StmtId>{stmtId},
+ toInsert);
+
bytesInBatch += batch.back().doc.objsize();
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < maxBatchBytes)
continue; // Add more to batch before inserting.
}
- bool canContinue =
- insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out, fromMigrate);
+ bool canContinue = insertBatchAndHandleErrors(
+ opCtx, wholeOp, batch, &lastOpFixer, &out, type == InsertType::kFromMigrate);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
@@ -660,7 +667,6 @@ WriteResult performInserts(OperationContext* opCtx,
static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
const NamespaceString& ns,
- StmtId stmtId,
const UpdateRequest& updateRequest) {
const ExtensionsCallbackReal extensionsCallback(opCtx, &updateRequest.getNamespaceString());
ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback);
@@ -760,7 +766,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
OperationContext* opCtx,
const NamespaceString& ns,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
const write_ops::UpdateOpEntry& op,
LegacyRuntimeConstants runtimeConstants,
const boost::optional<BSONObj>& letParams) {
@@ -786,7 +792,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
if (letParams) {
request.setLetParameters(std::move(letParams));
}
- request.setStmtId(stmtId);
+ request.setStmtIds(stmtIds);
request.setYieldPolicy(opCtx->inMultiDocumentTransaction()
? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
: PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
@@ -796,7 +802,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
++numAttempts;
try {
- return performSingleUpdateOp(opCtx, ns, stmtId, request);
+ return performSingleUpdateOp(opCtx, ns, request);
} catch (ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
const ExtensionsCallbackReal extensionsCallback(opCtx, &request.getNamespaceString());
ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback);
@@ -823,7 +829,9 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
MONGO_UNREACHABLE;
}
-WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& wholeOp) {
+WriteResult performUpdates(OperationContext* opCtx,
+ const write_ops::Update& wholeOp,
+ const UpdateType& type) {
// Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -872,9 +880,16 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); });
try {
lastOpFixer.startingOp();
+
+ // A time-series insert can combine multiple writes into a single operation, and thus
+ // can have multiple statement ids associated with it if it is retryable.
+ auto stmtIds = type == UpdateType::kTimeseries && wholeOp.getStmtIds()
+ ? *wholeOp.getStmtIds()
+ : std::vector<StmtId>{stmtId};
+
out.results.emplace_back(performSingleUpdateOpWithDupKeyRetry(opCtx,
wholeOp.getNamespace(),
- stmtId,
+ stmtIds,
singleOp,
runtimeConstants,
wholeOp.getLet()));
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index b50c45dd168..0b2809251cf 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -56,6 +56,19 @@ struct WriteResult {
std::vector<StatusWith<SingleWriteResult>> results;
};
+/**
+ * Enums used to differentiate between types of insert/update operations based on how they were
+ * issued.
+ */
+enum class InsertType {
+ kStandard,
+ kFromMigrate, // From a chunk migration.
+ kTimeseries,
+};
+enum class UpdateType {
+ kStandard,
+ kTimeseries,
+};
/**
* Performs a batch of inserts, updates, or deletes.
@@ -68,15 +81,18 @@ struct WriteResult {
* exception being thrown from these functions. Callers are responsible for managing LastError in
* that case. This should generally be combined with LastError handling from parse failures.
*
- * 'fromMigrate' indicates whether the operation was induced by a chunk migration
+ * 'type' indicates whether the operation was induced by a standard write, a chunk migration, or a
+ * time-series insert.
*
* Note: performInserts() gets called for both user and internal (like tenant collection cloner,
* and initial sync/tenant migration oplog buffer) inserts.
*/
WriteResult performInserts(OperationContext* opCtx,
const write_ops::Insert& op,
- bool fromMigrate = false);
-WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op);
+ const InsertType& type = InsertType::kStandard);
+WriteResult performUpdates(OperationContext* opCtx,
+ const write_ops::Update& op,
+ const UpdateType& type = UpdateType::kStandard);
WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op);
/**
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index 0f0b2240272..4aa7f3a4748 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -79,7 +79,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
preImageOpTime, // pre-image optime
postImageOpTime, // post-image optime