summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-04-12 16:00:36 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2018-04-13 16:06:48 -0400
commit9056c4d63c3beaae5a5444cb3ad37b61037ae561 (patch)
tree0b602409754fa54fb6be22f5390b617c74f4f2dc /src/mongo/db/op_observer_impl.cpp
parent63144b6bb427ea74f6faa14e8bbe5efba3687164 (diff)
downloadmongo-9056c4d63c3beaae5a5444cb3ad37b61037ae561.tar.gz
SERVER-33534 Re-enable non-replication opObserver functionality within transactions.
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp93
1 files changed, 54 insertions, 39 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 28b4548230b..13daba5a083 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -357,31 +357,47 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
- Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
+ Session* const session = OperationContextSession::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction();
+
+ Date_t lastWriteDate;
+
+ std::vector<repl::OpTime> opTimeList;
+ repl::OpTime lastOpTime;
+
+ if (inMultiDocumentTransaction) {
// Do not add writes to the profile collection to the list of transaction operations, since
// these are done outside the transaction.
if (!opCtx->getWriteUnitOfWork()) {
invariant(nss.isSystemDotProfile());
return;
}
-
for (auto iter = first; iter != last; iter++) {
auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
session->addTransactionOperation(opCtx, operation);
}
- return;
- }
+ } else {
+ lastWriteDate = getWallClockTimeForOpLog(opCtx);
- const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
+ opTimeList =
+ repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate);
+ if (!opTimeList.empty())
+ lastOpTime = opTimeList.back();
- const auto opTimeList =
- repl::logInsertOps(opCtx, nss, uuid, session, first, last, fromMigrate, lastWriteDate);
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ using std::begin;
+ using std::end;
+ times.insert(end(times), begin(opTimeList), end(opTimeList));
- auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
- using std::begin;
- using std::end;
- times.insert(end(times), begin(opTimeList), end(opTimeList));
+ std::vector<StmtId> stmtIdsWritten;
+ std::transform(first,
+ last,
+ std::back_inserter(stmtIdsWritten),
+ [](const InsertStatement& stmt) { return stmt.stmtId; });
+
+ onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
+ }
auto css = (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
? nullptr
@@ -397,7 +413,6 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
}
}
- const auto lastOpTime = opTimeList.empty() ? repl::OpTime() : opTimeList.back();
if (nss.coll() == "system.js") {
Scope::storedFuncMod(opCtx);
} else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
@@ -413,14 +428,6 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
}
}
-
- std::vector<StmtId> stmtIdsWritten;
- std::transform(first,
- last,
- std::back_inserter(stmtIdsWritten),
- [](const InsertStatement& stmt) { return stmt.stmtId; });
-
- onWriteOpCompleted(opCtx, nss, session, stmtIdsWritten, lastOpTime, lastWriteDate);
}
void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
@@ -443,14 +450,23 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
return;
}
- Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
+ Session* const session = OperationContextSession::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction();
+ OpTimeBundle opTime;
+ if (inMultiDocumentTransaction) {
auto operation =
OplogEntry::makeUpdateOperation(args.nss, args.uuid, args.update, args.criteria);
session->addTransactionOperation(opCtx, operation);
- return;
+ } else {
+ opTime = replLogUpdate(opCtx, session, args);
+ onWriteOpCompleted(opCtx,
+ args.nss,
+ session,
+ std::vector<StmtId>{args.stmtId},
+ opTime.writeOpTime,
+ opTime.wallClockTime);
}
- const auto opTime = replLogUpdate(opCtx, session, args);
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "u", args.nss, args.update, &args.criteria);
@@ -474,13 +490,6 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
!opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
-
- onWriteOpCompleted(opCtx,
- args.nss,
- session,
- std::vector<StmtId>{args.stmtId},
- opTime.writeOpTime,
- opTime.wallClockTime);
}
void OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -497,16 +506,25 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
StmtId stmtId,
bool fromMigrate,
const boost::optional<BSONObj>& deletedDoc) {
- Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
+ Session* const session = OperationContextSession::get(opCtx);
auto& deleteState = getDeleteState(opCtx);
invariant(!deleteState.documentKey.isEmpty());
- if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
+ const bool inMultiDocumentTransaction =
+ session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction();
+ OpTimeBundle opTime;
+ if (inMultiDocumentTransaction) {
auto operation = OplogEntry::makeDeleteOperation(
nss, uuid, deletedDoc ? deletedDoc.get() : deleteState.documentKey);
session->addTransactionOperation(opCtx, operation);
- return;
+ } else {
+ opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc);
+ onWriteOpCompleted(opCtx,
+ nss,
+ session,
+ std::vector<StmtId>{stmtId},
+ opTime.writeOpTime,
+ opTime.wallClockTime);
}
- const auto opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc);
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "d", nss, deleteState.documentKey, nullptr);
@@ -531,9 +549,6 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
!opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
-
- onWriteOpCompleted(
- opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime, opTime.wallClockTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,