summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/noop_writes_wait_for_write_concern.js53
-rw-r--r--src/mongo/db/commands/bulk_write.cpp20
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp18
-rw-r--r--src/mongo/db/ops/write_ops_exec_util.cpp20
-rw-r--r--src/mongo/db/ops/write_ops_exec_util.h9
-rw-r--r--src/mongo/db/timeseries/timeseries_write_util.cpp4
6 files changed, 93 insertions, 31 deletions
diff --git a/jstests/replsets/noop_writes_wait_for_write_concern.js b/jstests/replsets/noop_writes_wait_for_write_concern.js
index 828ace51871..b49467ef1cd 100644
--- a/jstests/replsets/noop_writes_wait_for_write_concern.js
+++ b/jstests/replsets/noop_writes_wait_for_write_concern.js
@@ -258,6 +258,56 @@ commands.push({
}
});
+// Skip these tests if the BulkWriteCommand feature flag is not enabled
+// TODO SERVER-67711: Remove feature flag check.
+if (FeatureFlagUtil.isPresentAndEnabled(db, "BulkWriteCommand")) {
+ // 'bulkWrite' where the document with the same _id has already been inserted.
+ commands.push({
+ req: {
+ bulkWrite: 1,
+ ops: [{insert: 0, document: {_id: 1}}],
+ nsInfo: [{ns: `${dbName}.${collName}`}]
+ },
+ setupFunc: function() {
+ assert.commandWorked(coll.insert({_id: 1}));
+ },
+ confirmFunc: function(res) {
+ assert.commandWorkedIgnoringWriteErrorsAndWriteConcernErrors(res);
+ assert.eq(res.cursor.firstBatch[0].code, ErrorCodes.DuplicateKey);
+ assert.eq(coll.count({_id: 1}), 1);
+ }
+ });
+
+ // 'bulkWrite' where we are doing a mix of local and non-local writes
+ // and the last op is an insert of a non-local doc with the _id of an
+ // existing doc.
+ var localDBName = "local";
+ var localDB = primary.getDB("local");
+ var localColl = localDB[collName];
+ localColl.drop();
+
+ commands.push({
+ req: {
+ bulkWrite: 1,
+ ops: [{insert: 0, document: {_id: 1}}, {insert: 1, document: {_id: 1}}],
+ nsInfo: [{ns: `${localDBName}.${collName}`}, {ns: `${dbName}.${collName}`}]
+ },
+ setupFunc: function() {
+ assert.commandWorked(coll.insert({_id: 1}));
+ },
+ confirmFunc: function(res) {
+ assert.commandWorkedIgnoringWriteErrorsAndWriteConcernErrors(res);
+ // the local insert happened
+ assert.eq(res.cursor.firstBatch[0].ok, 1);
+ assert.eq(res.cursor.firstBatch[0].n, 1);
+ assert.eq(localColl.count({_id: 1}), 1);
+ // the non-local insert failed
+ assert.eq(res.cursor.firstBatch[1].code, ErrorCodes.DuplicateKey);
+ assert.eq(coll.count({_id: 1}), 1);
+ }
+ });
+}
+
function testCommandWithWriteConcern(cmd) {
// Provide a small wtimeout that we expect to time out.
cmd.req.writeConcern = {w: 3, wtimeout: 1000};
@@ -287,7 +337,8 @@ function testCommandWithWriteConcern(cmd) {
var shell2 = new Mongo(primary.host);
// We check the error code of 'res' in the 'confirmFunc'.
- var res = shell2.getDB(dbName).runCommand(cmd.req);
+ var res = "bulkWrite" in cmd.req ? shell2.adminCommand(cmd.req)
+ : shell2.getDB(dbName).runCommand(cmd.req);
try {
// Tests that the command receives a write concern error. If we don't wait for write
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index a91a7c9698f..ea8fbb11017 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -91,8 +91,16 @@ public:
std::function<void(OperationContext*, size_t, write_ops_exec::WriteResult&)>;
InsertBatch() = delete;
- InsertBatch(const BulkWriteCommandRequest& request, int capacity, ReplyHandler replyCallback)
- : _req(request), _replyFn(replyCallback), _currentNs(), _batch(), _firstOpIdx() {
+ InsertBatch(const BulkWriteCommandRequest& request,
+ int capacity,
+ ReplyHandler replyCallback,
+ write_ops_exec::LastOpFixer& lastOpFixer)
+ : _req(request),
+ _replyFn(replyCallback),
+ _lastOpFixer(lastOpFixer),
+ _currentNs(),
+ _batch(),
+ _firstOpIdx() {
_batch.reserve(capacity);
}
@@ -113,14 +121,12 @@ public:
auto size = _batch.size();
out.results.reserve(size);
- write_ops_exec::LastOpFixer lastOpFixer(opCtx, _currentNs.getNs());
-
out.canContinue = write_ops_exec::insertBatchAndHandleErrors(opCtx,
_currentNs.getNs(),
_currentNs.getCollectionUUID(),
_req.getOrdered(),
_batch,
- &lastOpFixer,
+ &_lastOpFixer,
&out,
OperationSource::kStandard);
_batch.clear();
@@ -162,6 +168,7 @@ public:
private:
const BulkWriteCommandRequest& _req;
ReplyHandler _replyFn;
+ write_ops_exec::LastOpFixer& _lastOpFixer;
NamespaceInfoEntry _currentNs;
std::vector<InsertStatement> _batch;
boost::optional<int> _firstOpIdx;
@@ -823,7 +830,8 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
// Create a current insert batch.
const size_t maxBatchSize = internalInsertMaxBatchSize.load();
- auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB);
+ write_ops_exec::LastOpFixer lastOpFixer(opCtx);
+ auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB, lastOpFixer);
size_t idx = 0;
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 7b28fdca491..8fae667184d 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -589,7 +589,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
if (!collection->getCollection()->isCapped() && !inTxn && batch.size() > 1) {
// First try doing it all together. If all goes well, this is all we need to do.
// See Collection::_insertDocuments for why we do all capped inserts one-at-a-time.
- lastOpFixer->startingOp();
+ lastOpFixer->startingOp(nss);
insertDocuments(opCtx,
collection->getCollection(),
batch.begin(),
@@ -629,7 +629,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
// Transactions are not allowed to operate on capped collections.
uassertStatusOK(
checkIfTransactionOnCappedColl(opCtx, collection->getCollection()));
- lastOpFixer->startingOp();
+ lastOpFixer->startingOp(nss);
insertDocuments(opCtx,
collection->getCollection(),
it,
@@ -968,7 +968,7 @@ WriteResult performInserts(OperationContext* opCtx,
DisableSafeContentValidationIfTrue safeContentValidationDisabler(
opCtx, disableDocumentValidation, fleCrudProcessed);
- LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
+ LastOpFixer lastOpFixer(opCtx);
WriteResult out;
out.results.reserve(wholeOp.getDocuments().size());
@@ -1347,7 +1347,7 @@ WriteResult performUpdates(OperationContext* opCtx,
DisableSafeContentValidationIfTrue safeContentValidationDisabler(
opCtx, disableDocumentValidation, fleCrudProcessed);
- LastOpFixer lastOpFixer(opCtx, ns);
+ LastOpFixer lastOpFixer(opCtx);
bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });
@@ -1401,7 +1401,7 @@ WriteResult performUpdates(OperationContext* opCtx,
}
try {
- lastOpFixer.startingOp();
+ lastOpFixer.startingOp(ns);
// A time-series insert can combine multiple writes into a single operation, and thus
// can have multiple statement ids associated with it if it is retryable.
@@ -1615,7 +1615,7 @@ WriteResult performDeletes(OperationContext* opCtx,
DisableSafeContentValidationIfTrue safeContentValidationDisabler(
opCtx, disableDocumentValidation, fleCrudProcessed);
- LastOpFixer lastOpFixer(opCtx, ns);
+ LastOpFixer lastOpFixer(opCtx);
bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });
@@ -1675,7 +1675,7 @@ WriteResult performDeletes(OperationContext* opCtx,
}
try {
- lastOpFixer.startingOp();
+ lastOpFixer.startingOp(ns);
boost::optional<Timer> timer;
if (singleOp.getMulti()) {
@@ -1731,8 +1731,8 @@ Status performAtomicTimeseriesWrites(
DisableDocumentValidation disableDocumentValidation{opCtx};
- LastOpFixer lastOpFixer{opCtx, ns};
- lastOpFixer.startingOp();
+ LastOpFixer lastOpFixer(opCtx);
+ lastOpFixer.startingOp(ns);
AutoGetCollection coll{opCtx, ns, MODE_IX};
if (!coll) {
diff --git a/src/mongo/db/ops/write_ops_exec_util.cpp b/src/mongo/db/ops/write_ops_exec_util.cpp
index 5e7f3bb1660..c5d121ef454 100644
--- a/src/mongo/db/ops/write_ops_exec_util.cpp
+++ b/src/mongo/db/ops/write_ops_exec_util.cpp
@@ -35,18 +35,16 @@
namespace mongo::write_ops_exec {
-LastOpFixer::LastOpFixer(OperationContext* opCtx, const NamespaceString& ns)
- : _opCtx(opCtx), _isOnLocalDb(ns.isLocal()) {}
+LastOpFixer::LastOpFixer(OperationContext* opCtx) : _opCtx(opCtx) {}
LastOpFixer::~LastOpFixer() {
// We don't need to do this if we are in a multi-document transaction as read-only/noop
// transactions will always write another noop entry at transaction commit time which we can
// use to wait for writeConcern.
- if (!_opCtx->inMultiDocumentTransaction() && _needToFixLastOp && !_isOnLocalDb) {
+ if (!_opCtx->inMultiDocumentTransaction() && _needToFixLastOp) {
// If this operation has already generated a new lastOp, don't bother setting it
// here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case. Operations on the local DB aren't replicated, so they
- // don't need to bump the lastOp.
+ // guard to fire in that case.
replClientInfo().setLastOpToSystemLastOpTimeIgnoringCtxInterrupted(_opCtx);
LOGV2_DEBUG(20888,
5,
@@ -56,15 +54,17 @@ LastOpFixer::~LastOpFixer() {
}
}
-void LastOpFixer::startingOp() {
- _needToFixLastOp = true;
+void LastOpFixer::startingOp(const NamespaceString& ns) {
+ // Operations on the local DB aren't replicated, so they don't need to bump the lastOp.
+ _needToFixLastOp = !ns.isLocal();
_opTimeAtLastOpStart = replClientInfo().getLastOp();
}
void LastOpFixer::finishedOpSuccessfully() {
- // If the op was successful and bumped LastOp, we don't need to do it again. However, we
- // still need to for no-ops and all failing ops.
- _needToFixLastOp = (replClientInfo().getLastOp() == _opTimeAtLastOpStart);
+ // If we intended to fix the LastOp for this operation when it started, fix it now
+ // if it was a no-op write. If the op was successful and already bumped LastOp itself,
+ // we don't need to do it again.
+ _needToFixLastOp = _needToFixLastOp && (replClientInfo().getLastOp() == _opTimeAtLastOpStart);
}
void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& nss) {
diff --git a/src/mongo/db/ops/write_ops_exec_util.h b/src/mongo/db/ops/write_ops_exec_util.h
index d7bd993aae5..cf9d80a2fd6 100644
--- a/src/mongo/db/ops/write_ops_exec_util.h
+++ b/src/mongo/db/ops/write_ops_exec_util.h
@@ -43,12 +43,16 @@ namespace mongo::write_ops_exec {
*/
class LastOpFixer {
public:
- LastOpFixer(OperationContext* opCtx, const NamespaceString& ns);
+ LastOpFixer(OperationContext* opCtx);
~LastOpFixer();
- void startingOp();
+ // Called when we are starting an operation on the given namespace. The namespace is
+ // needed so we can check if it is local, since we do not need to fix lastOp for local
+ // writes.
+ void startingOp(const NamespaceString& ns);
+ // Called when we finish the operation that we last called startingOp() for.
void finishedOpSuccessfully();
private:
@@ -58,7 +62,6 @@ private:
OperationContext* const _opCtx;
bool _needToFixLastOp = true;
- const bool _isOnLocalDb;
repl::OpTime _opTimeAtLastOpStart;
};
diff --git a/src/mongo/db/timeseries/timeseries_write_util.cpp b/src/mongo/db/timeseries/timeseries_write_util.cpp
index 7240919ffd9..bfa6883076c 100644
--- a/src/mongo/db/timeseries/timeseries_write_util.cpp
+++ b/src/mongo/db/timeseries/timeseries_write_util.cpp
@@ -154,8 +154,8 @@ Status performAtomicWrites(OperationContext* opCtx,
DisableDocumentValidation disableDocumentValidation{opCtx};
- write_ops_exec::LastOpFixer lastOpFixer{opCtx, ns};
- lastOpFixer.startingOp();
+ write_ops_exec::LastOpFixer lastOpFixer{opCtx};
+ lastOpFixer.startingOp(ns);
auto curOp = CurOp::get(opCtx);
curOp->raiseDbProfileLevel(CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(ns.dbName()));