diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 93 |
1 files changed, 54 insertions, 39 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 28b4548230b..13daba5a083 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -357,31 +357,47 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator first, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { - Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { + Session* const session = OperationContextSession::get(opCtx); + const bool inMultiDocumentTransaction = + session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction(); + + Date_t lastWriteDate; + + std::vector<repl::OpTime> opTimeList; + repl::OpTime lastOpTime; + + if (inMultiDocumentTransaction) { // Do not add writes to the profile collection to the list of transaction operations, since // these are done outside the transaction. if (!opCtx->getWriteUnitOfWork()) { invariant(nss.isSystemDotProfile()); return; } - for (auto iter = first; iter != last; iter++) { auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); session->addTransactionOperation(opCtx, operation); } - return; - } + } else { + lastWriteDate = getWallClockTimeForOpLog(opCtx); - const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); + opTimeList = + repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate); + if (!opTimeList.empty()) + lastOpTime = opTimeList.back(); - const auto opTimeList = - repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate); + auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; + using std::begin; + using std::end; + times.insert(end(times), begin(opTimeList), end(opTimeList)); - auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; - using std::begin; - using std::end; - times.insert(end(times), begin(opTimeList), end(opTimeList)); + std::vector<StmtId> stmtIdsWritten; + std::transform(first, + last, + std::back_inserter(stmtIdsWritten), + [](const InsertStatement& stmt) { return stmt.stmtId; }); + + onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); + } auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) ? nullptr @@ -397,7 +413,6 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } } - const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back(); if (nss.coll() == "system.js") { Scope::storedFuncMod(opCtx); } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) { @@ -413,14 +428,6 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc); } } - - std::vector<StmtId> stmtIdsWritten; - std::transform(first, - last, - std::back_inserter(stmtIdsWritten), - [](const InsertStatement& stmt) { return stmt.stmtId; }); - - onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate); } void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { @@ -443,14 +450,23 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg return; } - Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { + Session* const session = OperationContextSession::get(opCtx); + const bool inMultiDocumentTransaction = + session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction(); + OpTimeBundle opTime; + if (inMultiDocumentTransaction) { auto operation = OplogEntry::makeUpdateOperation(args.nss, args.uuid, args.update, args.criteria); session->addTransactionOperation(opCtx, operation); - return; + } else { + opTime = replLogUpdate(opCtx, session, args); + onWriteOpCompleted(opCtx, + args.nss, + session, + std::vector<StmtId>{args.stmtId}, + opTime.writeOpTime, + opTime.wallClockTime); } - const auto opTime = replLogUpdate(opCtx, session, args); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "u", args.nss, args.update, &args.criteria); @@ -474,13 +490,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - - onWriteOpCompleted(opCtx, - args.nss, - session, - std::vector<StmtId>{args.stmtId}, - opTime.writeOpTime, - opTime.wallClockTime); } void OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -497,16 +506,25 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, StmtId stmtId, bool fromMigrate, const boost::optional<BSONObj>& deletedDoc) { - Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; + Session* const session = OperationContextSession::get(opCtx); auto& deleteState = getDeleteState(opCtx); invariant(!deleteState.documentKey.isEmpty()); - if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { + const bool inMultiDocumentTransaction = + session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction(); + OpTimeBundle opTime; + if (inMultiDocumentTransaction) { auto operation = OplogEntry::makeDeleteOperation( nss, uuid, deletedDoc ? deletedDoc.get() : deleteState.documentKey); session->addTransactionOperation(opCtx, operation); - return; + } else { + opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc); + onWriteOpCompleted(opCtx, + nss, + session, + std::vector<StmtId>{stmtId}, + opTime.writeOpTime, + opTime.wallClockTime); } - const auto opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "d", nss, deleteState.documentKey, nullptr); @@ -531,9 +549,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - - onWriteOpCompleted( - opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, |