summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-09-27 23:34:00 -0400
committerRandolph Tan <randolph@10gen.com>2017-09-27 23:34:00 -0400
commitaaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e (patch)
treefcdf67ad36b0b9392e1218a5defc31ea8bdd45e4 /src/mongo/db/op_observer_impl.cpp
parenteeee1e2b64f70e8487f017ba579f3ca861c81e4f (diff)
downloadmongo-aaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e.tar.gz
Revert "Revert "SERVER-30894 Implement command for transferring session information during migration""
This reverts commit 522f7f7d36a4a71059dd2d5219c2a0f074dfd0a1.
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp95
1 files changed, 63 insertions, 32 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 9a6bd8e5be7..b466b70c3bb 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -115,10 +115,15 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd,
return cmdObjBuilder.obj();
}
+struct OpTimeBundle {
+ repl::OpTime writeOpTime;
+ repl::OpTime prePostImageOpTime;
+};
+
/**
* Write oplog entry(ies) for the update operation.
*/
-repl::OpTime replLogUpdate(OperationContext* opCtx,
+OpTimeBundle replLogUpdate(OperationContext* opCtx,
Session* session,
const OplogUpdateEntryArgs& args) {
BSONObj storeObj;
@@ -138,6 +143,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ OpTimeBundle opTimes;
+
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
auto noteUpdateOpTime = repl::logOp(opCtx,
"n",
@@ -150,6 +157,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
args.stmtId,
{});
+ opTimes.prePostImageOpTime = noteUpdateOpTime;
+
if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
oplogLink.preImageTs = noteUpdateOpTime.getTimestamp();
} else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
@@ -157,22 +166,24 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
}
}
- return repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- sessionInfo,
- args.stmtId,
- oplogLink);
+ opTimes.writeOpTime = repl::logOp(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
+
+ return opTimes;
}
/**
* Write oplog entry(ies) for the delete operation.
*/
-repl::OpTime replLogDelete(OperationContext* opCtx,
+OpTimeBundle replLogDelete(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
Session* session,
@@ -189,22 +200,26 @@ repl::OpTime replLogDelete(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ OpTimeBundle opTimes;
+
if (deletedDoc && opCtx->getTxnNumber()) {
auto noteOplog = repl::logOp(
opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {});
+ opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageTs = noteOplog.getTimestamp();
}
- return repl::logOp(opCtx,
- "d",
- nss,
- uuid,
- deleteState.documentKey,
- nullptr,
- fromMigrate,
- sessionInfo,
- stmtId,
- oplogLink);
+ opTimes.writeOpTime = repl::logOp(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ return opTimes;
}
} // namespace
@@ -253,7 +268,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, systemIndexes);
if (!fromMigrate) {
- css->onInsertOp(opCtx, indexDoc);
+ css->onInsertOp(opCtx, indexDoc, {});
}
}
@@ -264,15 +279,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate);
+
+ const size_t count = end - begin;
+ auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ const auto lastOpTime =
+ repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
- for (auto it = begin; it != end; it++) {
+ size_t index = 0;
+ for (auto it = begin; it != end; it++, index++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (!fromMigrate) {
- css->onInsertOp(opCtx, it->doc);
+ css->onInsertOp(opCtx, it->doc, timestamps[index]);
}
}
@@ -312,7 +332,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
auto css = CollectionShardingState::get(opCtx, args.nss);
if (!args.fromMigrate) {
- css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc);
+ css->onUpdateOp(opCtx,
+ args.criteria,
+ args.update,
+ args.updatedDoc,
+ opTime.writeOpTime.getTimestamp(),
+ opTime.prePostImageOpTime.getTimestamp());
}
if (args.nss.coll() == "system.js") {
@@ -322,11 +347,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
} else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc);
} else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
- !opTime.isNull()) {
+ !opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
- onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime);
+
+ onWriteOpCompleted(
+ opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
}
auto OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -356,7 +383,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, nss.ns());
if (!fromMigrate) {
- css->onDeleteOp(opCtx, deleteState);
+ css->onDeleteOp(opCtx,
+ deleteState,
+ opTime.writeOpTime.getTimestamp(),
+ opTime.prePostImageOpTime.getTimestamp());
}
if (nss.coll() == "system.js") {
@@ -365,11 +395,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey);
- } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) {
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
- onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime);
+ onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,