diff options
author | Randolph Tan <randolph@10gen.com> | 2017-07-17 09:54:31 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-07-21 12:53:45 -0400 |
commit | f7924be532372d4754ac3a496881e9334de21a7b (patch) | |
tree | 51baa20662d8e83d4bb8ea9e48e8d662638eafea /src | |
parent | 769ce1808686e408bf41844c106ab5ce289339ee (diff) | |
download | mongo-f7924be532372d4754ac3a496881e9334de21a7b.tar.gz |
SERVER-28912 Thread stmtId from update and delete requests to oplog
Diffstat (limited to 'src')
25 files changed, 117 insertions, 38 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index a07039e74c9..39d61c71477 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -238,6 +238,7 @@ public: OperationContext* opCtx) const = 0; virtual void deleteDocument(OperationContext* opCtx, + StmtId stmtId, const RecordId& loc, OpDebug* opDebug, bool fromMigrate, @@ -453,11 +454,12 @@ public: * will not be logged. */ inline void deleteDocument(OperationContext* const opCtx, + StmtId stmtId, const RecordId& loc, OpDebug* const opDebug, const bool fromMigrate = false, const bool noWarn = false) { - return this->_impl().deleteDocument(opCtx, loc, opDebug, fromMigrate, noWarn); + return this->_impl().deleteDocument(opCtx, stmtId, loc, opDebug, fromMigrate, noWarn); } /* diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index e1300d2ce96..7398a18adfd 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -541,8 +541,12 @@ Status CollectionImpl::aboutToDeleteCapped(OperationContext* opCtx, return Status::OK(); } -void CollectionImpl::deleteDocument( - OperationContext* opCtx, const RecordId& loc, OpDebug* opDebug, bool fromMigrate, bool noWarn) { +void CollectionImpl::deleteDocument(OperationContext* opCtx, + StmtId stmtId, + const RecordId& loc, + OpDebug* opDebug, + bool fromMigrate, + bool noWarn) { if (isCapped()) { log() << "failing remove on a capped ns " << _ns; uasserted(10089, "cannot remove from a capped collection"); @@ -566,7 +570,7 @@ void CollectionImpl::deleteDocument( _recordStore->deleteRecord(opCtx, loc); getGlobalServiceContext()->getOpObserver()->onDelete( - opCtx, ns(), uuid(), std::move(deleteState), fromMigrate); + opCtx, ns(), uuid(), stmtId, std::move(deleteState), fromMigrate); } Counter64 moveCounter; diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 6686eda48b8..5505b1c5c4c 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -127,6 +127,8 @@ public: /** * Deletes the document with the given RecordId from the collection. * + * 'stmtId' the statement id for this delete operation. Pass in kUninitializedStmtId if not + * applicable. * 'fromMigrate' indicates whether the delete was induced by a chunk migration, and * so should be ignored by the user as an internal maintenance operation and not a * real delete. @@ -137,6 +139,7 @@ public: * will not be logged. */ void deleteDocument(OperationContext* opCtx, + StmtId stmtId, const RecordId& loc, OpDebug* opDebug, bool fromMigrate = false, diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index 21fa1d643ce..97c59bdb8e9 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -125,6 +125,7 @@ public: } void deleteDocument(OperationContext* opCtx, + StmtId stmtId, const RecordId& loc, OpDebug* opDebug, bool fromMigrate, diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index 3aa9e89dc93..059f66a5cdb 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -215,7 +215,8 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { if (!_params.isExplain) { try { WriteUnitOfWork wunit(getOpCtx()); - _collection->deleteDocument(getOpCtx(), recordId, _params.opDebug, _params.fromMigrate); + _collection->deleteDocument( + getOpCtx(), _params.stmtId, recordId, _params.opDebug, _params.fromMigrate); wunit.commit(); } catch (const WriteConflictException& wce) { memberFreer.Dismiss(); // Keep this member around so we can retry deleting it. diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h index dff707e89b3..b66a7e273ea 100644 --- a/src/mongo/db/exec/delete.h +++ b/src/mongo/db/exec/delete.h @@ -28,9 +28,9 @@ #pragma once - #include "mongo/db/exec/plan_stage.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_session_id.h" namespace mongo { @@ -62,6 +62,9 @@ struct DeleteStageParams { // Should we return the document we just deleted? bool returnDeleted; + // The stmtId for this particular delete. + StmtId stmtId = kUninitializedStmtId; + // The parsed query predicate for this delete. Not owned here. CanonicalQuery* canonicalQuery; diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 70b13c08b07..6b6af6f01ab 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -291,6 +291,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco BSONObj idQuery = driver->makeOplogEntryQuery(newObj, request->isMulti()); OplogUpdateEntryArgs args; args.nss = _collection->ns(); + args.stmtId = request->getStmtId(); args.update = logObj; args.criteria = idQuery; args.fromMigrate = request->isFromMigration(); @@ -321,6 +322,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco OplogUpdateEntryArgs args; args.nss = _collection->ns(); args.uuid = _collection->uuid(); + args.stmtId = request->getStmtId(); args.update = logObj; args.criteria = idQuery; args.fromMigrate = request->isFromMigration(); @@ -481,9 +483,8 @@ void UpdateStage::doInsert() { WriteUnitOfWork wunit(getOpCtx()); invariant(_collection); const bool enforceQuota = !request->isGod(); - // TODO: SERVER-28912 include StmtId uassertStatusOK(_collection->insertDocument(getOpCtx(), - InsertStatement(newObj), + InsertStatement(request->getStmtId(), newObj), _params.opDebug, enforceQuota, request->isFromMigration())); diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index c4d8983c811..f6f1408f585 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -55,6 +55,8 @@ struct OplogUpdateEntryArgs { OptionalCollectionUUID uuid; + StmtId stmtId = kUninitializedStmtId; + // Fully updated document with damages (update modifiers) applied. BSONObj updatedDoc; @@ -108,6 +110,7 @@ public: virtual void onDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, + StmtId stmtId, CollectionShardingState::DeleteState deleteState, bool fromMigrate) = 0; virtual void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) = 0; diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 66d4f50d2a8..31836bd69e1 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -71,9 +71,17 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, if (e.fieldNameStringData() != "ns"_sd) builder.append(e); } - repl::logOp(opCtx, "c", nss.getCommandNS(), uuid, builder.done(), nullptr, fromMigrate); + repl::logOp(opCtx, + "c", + nss.getCommandNS(), + uuid, + builder.done(), + nullptr, + fromMigrate, + kUninitializedStmtId); } else { - repl::logOp(opCtx, "i", systemIndexes, {}, indexDoc, nullptr, fromMigrate); + repl::logOp( + opCtx, "i", systemIndexes, {}, indexDoc, nullptr, fromMigrate, kUninitializedStmtId); } AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", systemIndexes, indexDoc, nullptr); @@ -122,7 +130,14 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg return; } - repl::logOp(opCtx, "u", args.nss, args.uuid, args.update, &args.criteria, args.fromMigrate); + repl::logOp(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + args.stmtId); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "u", args.nss, args.update, &args.criteria); @@ -162,12 +177,13 @@ CollectionShardingState::DeleteState OpObserverImpl::aboutToDelete(OperationCont void OpObserverImpl::onDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, + StmtId stmtId, CollectionShardingState::DeleteState deleteState, bool fromMigrate) { if (deleteState.idDoc.isEmpty()) return; - repl::logOp(opCtx, "d", nss, uuid, deleteState.idDoc, nullptr, fromMigrate); + repl::logOp(opCtx, "d", nss, uuid, deleteState.idDoc, nullptr, fromMigrate, stmtId); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "d", nss, deleteState.idDoc, nullptr); @@ -188,7 +204,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } void OpObserverImpl::onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) { - repl::logOp(opCtx, "n", {}, {}, msgObj, nullptr, false); + repl::logOp(opCtx, "n", {}, {}, msgObj, nullptr, false, kUninitializedStmtId); } void OpObserverImpl::onCreateCollection(OperationContext* opCtx, @@ -220,7 +236,7 @@ void OpObserverImpl::onCreateCollection(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, "c", dbName, options.uuid, cmdObj, nullptr, false); + repl::logOp(opCtx, "c", dbName, options.uuid, cmdObj, nullptr, false, kUninitializedStmtId); } getGlobalAuthorizationManager()->logOp(opCtx, "c", dbName, cmdObj, nullptr); @@ -290,7 +306,7 @@ void OpObserverImpl::onCollMod(OperationContext* opCtx, if (!nss.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false); + repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, &o2Obj, false, kUninitializedStmtId); } getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); @@ -300,7 +316,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& BSONObj cmdObj = BSON("dropDatabase" << 1); const NamespaceString cmdNss{dbName, "$cmd"}; - repl::logOp(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false); + repl::logOp(opCtx, "c", cmdNss, {}, cmdObj, nullptr, false, kUninitializedStmtId); if (dbName == FeatureCompatibilityVersion::kDatabase) { FeatureCompatibilityVersion::onDropCollection(); @@ -320,7 +336,8 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, repl::OpTime dropOpTime; if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - dropOpTime = repl::logOp(opCtx, "c", dbName, uuid, cmdObj, nullptr, false); + dropOpTime = + repl::logOp(opCtx, "c", dbName, uuid, cmdObj, nullptr, false, kUninitializedStmtId); } if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) { @@ -355,7 +372,7 @@ void OpObserverImpl::onDropIndex(OperationContext* opCtx, const BSONObj& indexInfo) { BSONObj cmdObj = BSON("dropIndexes" << nss.coll() << "index" << indexName); auto commandNS = nss.getCommandNS(); - repl::logOp(opCtx, "c", commandNS, uuid, cmdObj, &indexInfo, false); + repl::logOp(opCtx, "c", commandNS, uuid, cmdObj, &indexInfo, false, kUninitializedStmtId); getGlobalAuthorizationManager()->logOp(opCtx, "c", commandNS, cmdObj, &indexInfo); } @@ -383,7 +400,7 @@ void OpObserverImpl::onRenameCollection(OperationContext* opCtx, } BSONObj cmdObj = builder.done(); - repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false); + repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId); if (fromCollection.coll() == DurableViewCatalog::viewsCollectionName() || toCollection.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange( @@ -414,7 +431,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd) { const NamespaceString cmdNss{dbName, "$cmd"}; - repl::logOp(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false); + repl::logOp(opCtx, "c", cmdNss, {}, applyOpCmd, nullptr, false, kUninitializedStmtId); getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr); } @@ -427,7 +444,7 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, if (!collectionName.isSystemDotProfile()) { // do not replicate system.profile modifications - repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false); + repl::logOp(opCtx, "c", cmdNss, uuid, cmdObj, nullptr, false, kUninitializedStmtId); } getGlobalAuthorizationManager()->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index a05efbb2861..bc701e1688b 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -57,6 +57,7 @@ public: void onDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, + StmtId stmtId, CollectionShardingState::DeleteState deleteState, bool fromMigrate) override; void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) override; diff --git a/src/mongo/db/op_observer_noop.cpp b/src/mongo/db/op_observer_noop.cpp index 4cbb3df65a9..fe863c705f5 100644 --- a/src/mongo/db/op_observer_noop.cpp +++ b/src/mongo/db/op_observer_noop.cpp @@ -58,6 +58,7 @@ CollectionShardingState::DeleteState OpObserverNoop::aboutToDelete(OperationCont void OpObserverNoop::onDelete(OperationContext*, const NamespaceString&, OptionalCollectionUUID, + StmtId stmtId, CollectionShardingState::DeleteState, bool) {} diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 8f3c949720d..9abcebd9d4e 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -57,6 +57,7 @@ public: void onDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, + StmtId stmtId, CollectionShardingState::DeleteState deleteState, bool fromMigrate) override; void onOpMessage(OperationContext* opCtx, const BSONObj& msgObj) override; diff --git a/src/mongo/db/ops/delete_request.h b/src/mongo/db/ops/delete_request.h index 54c55984894..b7c9ce46701 100644 --- a/src/mongo/db/ops/delete_request.h +++ b/src/mongo/db/ops/delete_request.h @@ -115,12 +115,22 @@ public: return _yieldPolicy; } + void setStmtId(StmtId stmtId) { + _stmtId = std::move(stmtId); + } + + StmtId getStmtId() const { + return _stmtId; + } + private: const NamespaceString& _nsString; BSONObj _query; BSONObj _proj; BSONObj _sort; BSONObj _collation; + // The statement id of this request. + StmtId _stmtId = kUninitializedStmtId; bool _multi; bool _god; bool _fromMigrate; diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index 4b821db9996..0e2e7e92210 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -30,6 +30,7 @@ #include "mongo/db/curop.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/explain.h" #include "mongo/util/mongoutils/str.h" @@ -193,6 +194,14 @@ public: return _yieldPolicy; } + inline void setStmtId(StmtId stmtId) { + _stmtId = std::move(stmtId); + } + + inline StmtId getStmtId() const { + return _stmtId; + } + const std::string toString() const { StringBuilder builder; builder << " query: " << _query; @@ -200,6 +209,7 @@ public: builder << " sort: " << _sort; builder << " collation: " << _collation; builder << " updates: " << _updates; + builder << " stmtId: " << _stmtId; builder << " arrayFilters: ["; bool first = true; @@ -241,6 +251,9 @@ private: // Filters to specify which array elements should be updated. std::vector<BSONObj> _arrayFilters; + // The statement id of this request. + StmtId _stmtId = kUninitializedStmtId; + // Flags controlling the update. // God bypasses _id checking and index generation. It is only used on behalf of system diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 34d10394657..f8548e2a18d 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -397,6 +397,12 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, return true; } +template <typename T> +StmtId getStmtIdForWriteOp(OperationContext* opCtx, const T& wholeOp, size_t opIndex) { + return opCtx->getTxnNumber() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex) + : kUninitializedStmtId; +} + } // namespace WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) { @@ -452,11 +458,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who // correct order. In an ordered insert, if one of the docs ahead of us fails, we should // behave as-if we never got to this document. } else { - const StmtId stmtId = opCtx->getTxnNumber() - ? write_ops::getStmtIdForWriteAt(wholeOp, stmtIdIndex++) - : kUninitializedStmtId; BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); - batch.emplace_back(stmtId, toInsert); + batch.emplace_back(getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), toInsert); bytesInBatch += batch.back().doc.objsize(); if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) continue; // Add more to batch before inserting. @@ -485,6 +488,7 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, const NamespaceString& ns, + StmtId stmtId, const write_ops::UpdateOpEntry& op) { globalOpCounters.gotUpdate(); auto& curOp = *CurOp::get(opCtx); @@ -503,6 +507,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, request.setQuery(op.getQ()); request.setUpdates(op.getU()); request.setCollation(write_ops::collationOf(op)); + request.setStmtId(stmtId); request.setArrayFilters(write_ops::arrayFiltersOf(op)); request.setMulti(op.getMulti()); request.setUpsert(op.getUpsert()); @@ -582,6 +587,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation()); LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getUpdates().size()); for (auto&& singleOp : wholeOp.getUpdates()) { @@ -598,7 +604,10 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleUpdateOp(opCtx, wholeOp.getNamespace(), singleOp)); + performSingleUpdateOp(opCtx, + wholeOp.getNamespace(), + getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), + singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = @@ -613,6 +622,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, const NamespaceString& ns, + StmtId stmtId, const write_ops::DeleteOpEntry& op) { globalOpCounters.gotDelete(); auto& curOp = *CurOp::get(opCtx); @@ -632,6 +642,7 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, request.setCollation(write_ops::collationOf(op)); request.setMulti(op.getMulti()); request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated. + request.setStmtId(stmtId); ParsedDelete parsedDelete(opCtx, &request); uassertStatusOK(parsedDelete.parseRequest()); @@ -692,6 +703,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation()); LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getDeletes().size()); for (auto&& singleOp : wholeOp.getDeletes()) { @@ -708,7 +720,10 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleDeleteOp(opCtx, wholeOp.getNamespace(), singleOp)); + performSingleDeleteOp(opCtx, + wholeOp.getNamespace(), + getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), + singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index bb2d6a6b68e..72ddfc90ab5 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -725,6 +725,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDelete( deleteStageParams.returnDeleted = request->shouldReturnDeleted(); deleteStageParams.sort = request->getSort(); deleteStageParams.opDebug = opDebug; + deleteStageParams.stmtId = request->getStmtId(); unique_ptr<WorkingSet> ws = make_unique<WorkingSet>(); const PlanExecutor::YieldPolicy policy = parsedDelete->yieldPolicy(); diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index 82a769c6c17..a8b432e7a7f 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -230,6 +230,7 @@ Status CollectionBulkLoaderImpl::commit() { _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &it] { WriteUnitOfWork wunit(_opCtx.get()); _autoColl->getCollection()->deleteDocument(_opCtx.get(), + kUninitializedStmtId, it, nullptr /** OpDebug **/, false /* fromMigrate */, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 68ee85a6364..e086d0fc9e6 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -414,7 +414,8 @@ OpTime logOp(OperationContext* opCtx, OptionalCollectionUUID uuid, const BSONObj& obj, const BSONObj* o2, - bool fromMigrate) { + bool fromMigrate, + StmtId statementId) { auto replCoord = ReplicationCoordinator::get(opCtx); if (replCoord->isOplogDisabledFor(opCtx, nss)) { return {}; @@ -427,7 +428,6 @@ OpTime logOp(OperationContext* opCtx, OplogSlot slot; getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot); - // TODO: SERVER-28912 Include statementId for other ops auto writer = _logOpWriter(opCtx, opstr, nss, @@ -438,7 +438,7 @@ OpTime logOp(OperationContext* opCtx, slot.opTime, slot.hash, Date_t::now(), - kUninitializedStmtId); + statementId); const DocWriter* basePtr = &writer; _logOpsInner(opCtx, nss, &basePtr, 1, oplog, replMode, slot.opTime); return slot.opTime; diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 1e4f6a443b9..6999e611d00 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -102,7 +102,8 @@ OpTime logOp(OperationContext* opCtx, OptionalCollectionUUID uuid, const BSONObj& obj, const BSONObj* o2, - bool fromMigrate); + bool fromMigrate, + StmtId stmtId); // Flush out the cached pointers to the local database and oplog. // Used by the closeDatabase command to ensure we don't cache closed things. diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 5635a888b1d..db12df283e8 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -307,7 +307,7 @@ StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, if (saver) { saver->goingToDelete(obj).transitional_ignore(); } - collection->deleteDocument(opCtx, rloc, nullptr, true); + collection->deleteDocument(opCtx, kUninitializedStmtId, rloc, nullptr, true); wuow.commit(); }); } while (++numDeleted < maxToDelete); diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp index 8e436995952..9426e941cf7 100644 --- a/src/mongo/db/views/durable_view_catalog.cpp +++ b/src/mongo/db/views/durable_view_catalog.cpp @@ -177,6 +177,6 @@ void DurableViewCatalogImpl::remove(OperationContext* opCtx, const NamespaceStri return; LOG(2) << "remove view " << name << " from " << _db->getSystemViewsName(); - systemViews->deleteDocument(opCtx, id, &CurOp::get(opCtx)->debug()); + systemViews->deleteDocument(opCtx, kUninitializedStmtId, id, &CurOp::get(opCtx)->debug()); } } // namespace mongo diff --git a/src/mongo/dbtests/multikey_paths_test.cpp b/src/mongo/dbtests/multikey_paths_test.cpp index 6eede918112..6caa614c495 100644 --- a/src/mongo/dbtests/multikey_paths_test.cpp +++ b/src/mongo/dbtests/multikey_paths_test.cpp @@ -349,7 +349,7 @@ TEST_F(MultikeyPathsTest, PathsNotUpdatedOnDocumentDelete) { { WriteUnitOfWork wuow(_opCtx.get()); OpDebug* const nullOpDebug = nullptr; - collection->deleteDocument(_opCtx.get(), record->id, nullOpDebug); + collection->deleteDocument(_opCtx.get(), kUninitializedStmtId, record->id, nullOpDebug); wuow.commit(); } } diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp index 705fe19f3fc..82009b6d898 100644 --- a/src/mongo/dbtests/query_stage_count.cpp +++ b/src/mongo/dbtests/query_stage_count.cpp @@ -116,7 +116,7 @@ public: void remove(const RecordId& recordId) { WriteUnitOfWork wunit(&_opCtx); OpDebug* const nullOpDebug = nullptr; - _coll->deleteDocument(&_opCtx, recordId, nullOpDebug); + _coll->deleteDocument(&_opCtx, kUninitializedStmtId, recordId, nullOpDebug); wunit.commit(); } diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index 9ab3fb4fd32..bb9082c81fb 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -461,7 +461,7 @@ public: set<RecordId>::iterator it = recordIds.begin(); { WriteUnitOfWork wuow(&_opCtx); - coll->deleteDocument(&_opCtx, *it++, nullOpDebug); + coll->deleteDocument(&_opCtx, kUninitializedStmtId, *it++, nullOpDebug); wuow.commit(); } exec->restoreState(); @@ -477,7 +477,7 @@ public: while (it != recordIds.end()) { { WriteUnitOfWork wuow(&_opCtx); - coll->deleteDocument(&_opCtx, *it++, nullOpDebug); + coll->deleteDocument(&_opCtx, kUninitializedStmtId, *it++, nullOpDebug); wuow.commit(); } } diff --git a/src/mongo/s/catalog/sharding_catalog_config_initialization_test.cpp b/src/mongo/s/catalog/sharding_catalog_config_initialization_test.cpp index 3a35357e38e..40d0c0861e3 100644 --- a/src/mongo/s/catalog/sharding_catalog_config_initialization_test.cpp +++ b/src/mongo/s/catalog/sharding_catalog_config_initialization_test.cpp @@ -241,7 +241,7 @@ TEST_F(ConfigInitializationTest, ReRunsIfDocRolledBackThenReElected) { } mongo::WriteUnitOfWork wuow(opCtx); for (auto recordId : recordIds) { - coll->deleteDocument(opCtx, recordId, nullptr); + coll->deleteDocument(opCtx, kUninitializedStmtId, recordId, nullptr); } wuow.commit(); ASSERT_EQUALS(0UL, coll->numRecords(opCtx)); |