diff options
author | Randolph Tan <randolph@10gen.com> | 2017-09-27 23:34:00 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-09-27 23:34:00 -0400 |
commit | aaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e (patch) | |
tree | fcdf67ad36b0b9392e1218a5defc31ea8bdd45e4 /src/mongo/db/op_observer_impl.cpp | |
parent | eeee1e2b64f70e8487f017ba579f3ca861c81e4f (diff) | |
download | mongo-aaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e.tar.gz |
Revert "Revert "SERVER-30894 Implement command for transferring session information during migration""
This reverts commit 522f7f7d36a4a71059dd2d5219c2a0f074dfd0a1.
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 95 |
1 files changed, 63 insertions, 32 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 9a6bd8e5be7..b466b70c3bb 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -115,10 +115,15 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd, return cmdObjBuilder.obj(); } +struct OpTimeBundle { + repl::OpTime writeOpTime; + repl::OpTime prePostImageOpTime; +}; + /** * Write oplog entry(ies) for the update operation. */ -repl::OpTime replLogUpdate(OperationContext* opCtx, +OpTimeBundle replLogUpdate(OperationContext* opCtx, Session* session, const OplogUpdateEntryArgs& args) { BSONObj storeObj; @@ -138,6 +143,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } + OpTimeBundle opTimes; + if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { auto noteUpdateOpTime = repl::logOp(opCtx, "n", @@ -150,6 +157,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx, args.stmtId, {}); + opTimes.prePostImageOpTime = noteUpdateOpTime; + if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { oplogLink.preImageTs = noteUpdateOpTime.getTimestamp(); } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { @@ -157,22 +166,24 @@ repl::OpTime replLogUpdate(OperationContext* opCtx, } } - return repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - sessionInfo, - args.stmtId, - oplogLink); + opTimes.writeOpTime = repl::logOp(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + sessionInfo, + args.stmtId, + oplogLink); + + return opTimes; } /** * Write oplog entry(ies) for the delete operation. */ -repl::OpTime replLogDelete(OperationContext* opCtx, +OpTimeBundle replLogDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, Session* session, @@ -189,22 +200,26 @@ repl::OpTime replLogDelete(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } + OpTimeBundle opTimes; + if (deletedDoc && opCtx->getTxnNumber()) { auto noteOplog = repl::logOp( opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {}); + opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageTs = noteOplog.getTimestamp(); } - return repl::logOp(opCtx, - "d", - nss, - uuid, - deleteState.documentKey, - nullptr, - fromMigrate, - sessionInfo, - stmtId, - oplogLink); + opTimes.writeOpTime = repl::logOp(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + sessionInfo, + stmtId, + oplogLink); + return opTimes; } } // namespace @@ -253,7 +268,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, systemIndexes); if (!fromMigrate) { - css->onInsertOp(opCtx, indexDoc); + css->onInsertOp(opCtx, indexDoc, {}); } } @@ -264,15 +279,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate); + + const size_t count = end - begin; + auto timestamps = stdx::make_unique<Timestamp[]>(count); + const auto lastOpTime = + repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate); auto css = CollectionShardingState::get(opCtx, nss.ns()); - for (auto it = begin; it != end; it++) { + size_t index = 0; + for (auto it = begin; it != end; it++, index++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (!fromMigrate) { - css->onInsertOp(opCtx, it->doc); + css->onInsertOp(opCtx, it->doc, timestamps[index]); } } @@ -312,7 +332,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg auto css = CollectionShardingState::get(opCtx, args.nss); if (!args.fromMigrate) { - css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc); + css->onUpdateOp(opCtx, + args.criteria, + args.update, + args.updatedDoc, + opTime.writeOpTime.getTimestamp(), + opTime.prePostImageOpTime.getTimestamp()); } if (args.nss.coll() == "system.js") { @@ -322,11 +347,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.isNull()) { + !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime); + + onWriteOpCompleted( + opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); } auto OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -356,7 +383,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, nss.ns()); if (!fromMigrate) { - css->onDeleteOp(opCtx, deleteState); + css->onDeleteOp(opCtx, + deleteState, + opTime.writeOpTime.getTimestamp(), + opTime.prePostImageOpTime.getTimestamp()); } if (nss.coll() == "system.js") { @@ -365,11 +395,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey); - } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) { + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && + !opTime.writeOpTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime); + onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, |