diff options
-rw-r--r-- | jstests/replsets/noop_writes_wait_for_write_concern.js | 53 | ||||
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec_util.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec_util.h | 9 | ||||
-rw-r--r-- | src/mongo/db/timeseries/timeseries_write_util.cpp | 4 |
6 files changed, 93 insertions, 31 deletions
diff --git a/jstests/replsets/noop_writes_wait_for_write_concern.js b/jstests/replsets/noop_writes_wait_for_write_concern.js index 828ace51871..b49467ef1cd 100644 --- a/jstests/replsets/noop_writes_wait_for_write_concern.js +++ b/jstests/replsets/noop_writes_wait_for_write_concern.js @@ -258,6 +258,56 @@ commands.push({ } }); +// Skip these tests if the BulkWriteCommand feature flag is not enabled +// TODO SERVER-67711: Remove feature flag check. +if (FeatureFlagUtil.isPresentAndEnabled(db, "BulkWriteCommand")) { + // 'bulkWrite' where the document with the same _id has already been inserted. + commands.push({ + req: { + bulkWrite: 1, + ops: [{insert: 0, document: {_id: 1}}], + nsInfo: [{ns: `${dbName}.${collName}`}] + }, + setupFunc: function() { + assert.commandWorked(coll.insert({_id: 1})); + }, + confirmFunc: function(res) { + assert.commandWorkedIgnoringWriteErrorsAndWriteConcernErrors(res); + assert.eq(res.cursor.firstBatch[0].code, ErrorCodes.DuplicateKey); + assert.eq(coll.count({_id: 1}), 1); + } + }); + + // 'bulkWrite' where we are doing a mix of local and non-local writes + // and the last op is an insert of a non-local doc with the _id of an + // existing doc. + var localDBName = "local"; + var localDB = primary.getDB("local"); + var localColl = localDB[collName]; + localColl.drop(); + + commands.push({ + req: { + bulkWrite: 1, + ops: [{insert: 0, document: {_id: 1}}, {insert: 1, document: {_id: 1}}], + nsInfo: [{ns: `${localDBName}.${collName}`}, {ns: `${dbName}.${collName}`}] + }, + setupFunc: function() { + assert.commandWorked(coll.insert({_id: 1})); + }, + confirmFunc: function(res) { + assert.commandWorkedIgnoringWriteErrorsAndWriteConcernErrors(res); + // the local insert happened + assert.eq(res.cursor.firstBatch[0].ok, 1); + assert.eq(res.cursor.firstBatch[0].n, 1); + assert.eq(localColl.count({_id: 1}), 1); + // the non-local insert failed + assert.eq(res.cursor.firstBatch[1].code, ErrorCodes.DuplicateKey); + assert.eq(coll.count({_id: 1}), 1); + } + }); +} + function testCommandWithWriteConcern(cmd) { // Provide a small wtimeout that we expect to time out. cmd.req.writeConcern = {w: 3, wtimeout: 1000}; @@ -287,7 +337,8 @@ function testCommandWithWriteConcern(cmd) { var shell2 = new Mongo(primary.host); // We check the error code of 'res' in the 'confirmFunc'. - var res = shell2.getDB(dbName).runCommand(cmd.req); + var res = "bulkWrite" in cmd.req ? shell2.adminCommand(cmd.req) + : shell2.getDB(dbName).runCommand(cmd.req); try { // Tests that the command receives a write concern error. If we don't wait for write diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index a91a7c9698f..ea8fbb11017 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -91,8 +91,16 @@ public: std::function<void(OperationContext*, size_t, write_ops_exec::WriteResult&)>; InsertBatch() = delete; - InsertBatch(const BulkWriteCommandRequest& request, int capacity, ReplyHandler replyCallback) - : _req(request), _replyFn(replyCallback), _currentNs(), _batch(), _firstOpIdx() { + InsertBatch(const BulkWriteCommandRequest& request, + int capacity, + ReplyHandler replyCallback, + write_ops_exec::LastOpFixer& lastOpFixer) + : _req(request), + _replyFn(replyCallback), + _lastOpFixer(lastOpFixer), + _currentNs(), + _batch(), + _firstOpIdx() { _batch.reserve(capacity); } @@ -113,14 +121,12 @@ public: auto size = _batch.size(); out.results.reserve(size); - write_ops_exec::LastOpFixer lastOpFixer(opCtx, _currentNs.getNs()); - out.canContinue = write_ops_exec::insertBatchAndHandleErrors(opCtx, _currentNs.getNs(), _currentNs.getCollectionUUID(), _req.getOrdered(), _batch, - &lastOpFixer, + &_lastOpFixer, &out, OperationSource::kStandard); _batch.clear(); @@ -162,6 +168,7 @@ public: private: const BulkWriteCommandRequest& _req; ReplyHandler _replyFn; + write_ops_exec::LastOpFixer& _lastOpFixer; NamespaceInfoEntry _currentNs; std::vector<InsertStatement> _batch; boost::optional<int> _firstOpIdx; @@ -823,7 +830,8 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, // Create a current insert batch. const size_t maxBatchSize = internalInsertMaxBatchSize.load(); - auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB); + write_ops_exec::LastOpFixer lastOpFixer(opCtx); + auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB, lastOpFixer); size_t idx = 0; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 7b28fdca491..8fae667184d 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -589,7 +589,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, if (!collection->getCollection()->isCapped() && !inTxn && batch.size() > 1) { // First try doing it all together. If all goes well, this is all we need to do. // See Collection::_insertDocuments for why we do all capped inserts one-at-a-time. - lastOpFixer->startingOp(); + lastOpFixer->startingOp(nss); insertDocuments(opCtx, collection->getCollection(), batch.begin(), @@ -629,7 +629,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, // Transactions are not allowed to operate on capped collections. uassertStatusOK( checkIfTransactionOnCappedColl(opCtx, collection->getCollection())); - lastOpFixer->startingOp(); + lastOpFixer->startingOp(nss); insertDocuments(opCtx, collection->getCollection(), it, @@ -968,7 +968,7 @@ WriteResult performInserts(OperationContext* opCtx, DisableSafeContentValidationIfTrue safeContentValidationDisabler( opCtx, disableDocumentValidation, fleCrudProcessed); - LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + LastOpFixer lastOpFixer(opCtx); WriteResult out; out.results.reserve(wholeOp.getDocuments().size()); @@ -1347,7 +1347,7 @@ WriteResult performUpdates(OperationContext* opCtx, DisableSafeContentValidationIfTrue safeContentValidationDisabler( opCtx, disableDocumentValidation, fleCrudProcessed); - LastOpFixer lastOpFixer(opCtx, ns); + LastOpFixer lastOpFixer(opCtx); bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); @@ -1401,7 +1401,7 @@ WriteResult performUpdates(OperationContext* opCtx, } try { - lastOpFixer.startingOp(); + lastOpFixer.startingOp(ns); // A time-series insert can combine multiple writes into a single operation, and thus // can have multiple statement ids associated with it if it is retryable. @@ -1615,7 +1615,7 @@ WriteResult performDeletes(OperationContext* opCtx, DisableSafeContentValidationIfTrue safeContentValidationDisabler( opCtx, disableDocumentValidation, fleCrudProcessed); - LastOpFixer lastOpFixer(opCtx, ns); + LastOpFixer lastOpFixer(opCtx); bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); @@ -1675,7 +1675,7 @@ WriteResult performDeletes(OperationContext* opCtx, } try { - lastOpFixer.startingOp(); + lastOpFixer.startingOp(ns); boost::optional<Timer> timer; if (singleOp.getMulti()) { @@ -1731,8 +1731,8 @@ Status performAtomicTimeseriesWrites( DisableDocumentValidation disableDocumentValidation{opCtx}; - LastOpFixer lastOpFixer{opCtx, ns}; - lastOpFixer.startingOp(); + LastOpFixer lastOpFixer(opCtx); + lastOpFixer.startingOp(ns); AutoGetCollection coll{opCtx, ns, MODE_IX}; if (!coll) { diff --git a/src/mongo/db/ops/write_ops_exec_util.cpp b/src/mongo/db/ops/write_ops_exec_util.cpp index 5e7f3bb1660..c5d121ef454 100644 --- a/src/mongo/db/ops/write_ops_exec_util.cpp +++ b/src/mongo/db/ops/write_ops_exec_util.cpp @@ -35,18 +35,16 @@ namespace mongo::write_ops_exec { -LastOpFixer::LastOpFixer(OperationContext* opCtx, const NamespaceString& ns) - : _opCtx(opCtx), _isOnLocalDb(ns.isLocal()) {} +LastOpFixer::LastOpFixer(OperationContext* opCtx) : _opCtx(opCtx) {} LastOpFixer::~LastOpFixer() { // We don't need to do this if we are in a multi-document transaction as read-only/noop // transactions will always write another noop entry at transaction commit time which we can // use to wait for writeConcern. - if (!_opCtx->inMultiDocumentTransaction() && _needToFixLastOp && !_isOnLocalDb) { + if (!_opCtx->inMultiDocumentTransaction() && _needToFixLastOp) { // If this operation has already generated a new lastOp, don't bother setting it // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. Operations on the local DB aren't replicated, so they - // don't need to bump the lastOp. + // guard to fire in that case. replClientInfo().setLastOpToSystemLastOpTimeIgnoringCtxInterrupted(_opCtx); LOGV2_DEBUG(20888, 5, @@ -56,15 +54,17 @@ LastOpFixer::~LastOpFixer() { } } -void LastOpFixer::startingOp() { - _needToFixLastOp = true; +void LastOpFixer::startingOp(const NamespaceString& ns) { + // Operations on the local DB aren't replicated, so they don't need to bump the lastOp. + _needToFixLastOp = !ns.isLocal(); _opTimeAtLastOpStart = replClientInfo().getLastOp(); } void LastOpFixer::finishedOpSuccessfully() { - // If the op was successful and bumped LastOp, we don't need to do it again. However, we - // still need to for no-ops and all failing ops. - _needToFixLastOp = (replClientInfo().getLastOp() == _opTimeAtLastOpStart); + // If we intended to fix the LastOp for this operation when it started, fix it now + // if it was a no-op write. If the op was successful and already bumped LastOp itself, + // we don't need to do it again. + _needToFixLastOp = _needToFixLastOp && (replClientInfo().getLastOp() == _opTimeAtLastOpStart); } void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& nss) { diff --git a/src/mongo/db/ops/write_ops_exec_util.h b/src/mongo/db/ops/write_ops_exec_util.h index d7bd993aae5..cf9d80a2fd6 100644 --- a/src/mongo/db/ops/write_ops_exec_util.h +++ b/src/mongo/db/ops/write_ops_exec_util.h @@ -43,12 +43,16 @@ namespace mongo::write_ops_exec { */ class LastOpFixer { public: - LastOpFixer(OperationContext* opCtx, const NamespaceString& ns); + LastOpFixer(OperationContext* opCtx); ~LastOpFixer(); - void startingOp(); + // Called when we are starting an operation on the given namespace. The namespace is + // needed so we can check if it is local, since we do not need to fix lastOp for local + // writes. + void startingOp(const NamespaceString& ns); + // Called when we finish the operation that we last called startingOp() for. void finishedOpSuccessfully(); private: @@ -58,7 +62,6 @@ private: OperationContext* const _opCtx; bool _needToFixLastOp = true; - const bool _isOnLocalDb; repl::OpTime _opTimeAtLastOpStart; }; diff --git a/src/mongo/db/timeseries/timeseries_write_util.cpp b/src/mongo/db/timeseries/timeseries_write_util.cpp index 7240919ffd9..bfa6883076c 100644 --- a/src/mongo/db/timeseries/timeseries_write_util.cpp +++ b/src/mongo/db/timeseries/timeseries_write_util.cpp @@ -154,8 +154,8 @@ Status performAtomicWrites(OperationContext* opCtx, DisableDocumentValidation disableDocumentValidation{opCtx}; - write_ops_exec::LastOpFixer lastOpFixer{opCtx, ns}; - lastOpFixer.startingOp(); + write_ops_exec::LastOpFixer lastOpFixer{opCtx}; + lastOpFixer.startingOp(ns); auto curOp = CurOp::get(opCtx); curOp->raiseDbProfileLevel(CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(ns.dbName())); |