diff options
author | Randolph Tan <randolph@10gen.com> | 2017-09-28 10:45:49 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-09-28 10:45:49 -0400 |
commit | f24fbb0011c6ded9101f08574e7cd07e63690a9b (patch) | |
tree | e1452828e142748f1f03be61a00c32dbb3ed6bc1 /src/mongo/db/op_observer_impl.cpp | |
parent | d293f6857bcb36b26ca8fa03d90299714fe060de (diff) | |
download | mongo-f24fbb0011c6ded9101f08574e7cd07e63690a9b.tar.gz |
Revert "Revert "Revert "SERVER-30894 Implement command for transferring session information during migration"""
This reverts commit aaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e.
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 95 |
1 files changed, 32 insertions, 63 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index b466b70c3bb..9a6bd8e5be7 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -115,15 +115,10 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd, return cmdObjBuilder.obj(); } -struct OpTimeBundle { - repl::OpTime writeOpTime; - repl::OpTime prePostImageOpTime; -}; - /** * Write oplog entry(ies) for the update operation. */ -OpTimeBundle replLogUpdate(OperationContext* opCtx, +repl::OpTime replLogUpdate(OperationContext* opCtx, Session* session, const OplogUpdateEntryArgs& args) { BSONObj storeObj; @@ -143,8 +138,6 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } - OpTimeBundle opTimes; - if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { auto noteUpdateOpTime = repl::logOp(opCtx, "n", @@ -157,8 +150,6 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, args.stmtId, {}); - opTimes.prePostImageOpTime = noteUpdateOpTime; - if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) { oplogLink.preImageTs = noteUpdateOpTime.getTimestamp(); } else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) { @@ -166,24 +157,22 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, } } - opTimes.writeOpTime = repl::logOp(opCtx, - "u", - args.nss, - args.uuid, - args.update, - &args.criteria, - args.fromMigrate, - sessionInfo, - args.stmtId, - oplogLink); - - return opTimes; + return repl::logOp(opCtx, + "u", + args.nss, + args.uuid, + args.update, + &args.criteria, + args.fromMigrate, + sessionInfo, + args.stmtId, + oplogLink); } /** * Write oplog entry(ies) for the delete operation. */ -OpTimeBundle replLogDelete(OperationContext* opCtx, +repl::OpTime replLogDelete(OperationContext* opCtx, const NamespaceString& nss, OptionalCollectionUUID uuid, Session* session, @@ -200,26 +189,22 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber()); } - OpTimeBundle opTimes; - if (deletedDoc && opCtx->getTxnNumber()) { auto noteOplog = repl::logOp( opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {}); - opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageTs = noteOplog.getTimestamp(); } - opTimes.writeOpTime = repl::logOp(opCtx, - "d", - nss, - uuid, - deleteState.documentKey, - nullptr, - fromMigrate, - sessionInfo, - stmtId, - oplogLink); - return opTimes; + return repl::logOp(opCtx, + "d", + nss, + uuid, + deleteState.documentKey, + nullptr, + fromMigrate, + sessionInfo, + stmtId, + oplogLink); } } // namespace @@ -268,7 +253,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, systemIndexes); if (!fromMigrate) { - css->onInsertOp(opCtx, indexDoc, {}); + css->onInsertOp(opCtx, indexDoc); } } @@ -279,20 +264,15 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; - - const size_t count = end - begin; - auto timestamps = stdx::make_unique<Timestamp[]>(count); - const auto lastOpTime = - repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate); + const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate); auto css = CollectionShardingState::get(opCtx, nss.ns()); - size_t index = 0; - for (auto it = begin; it != end; it++, index++) { + for (auto it = begin; it != end; it++) { AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "i", nss, it->doc, nullptr); if (!fromMigrate) { - css->onInsertOp(opCtx, it->doc, timestamps[index]); + css->onInsertOp(opCtx, it->doc); } } @@ -332,12 +312,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg auto css = CollectionShardingState::get(opCtx, args.nss); if (!args.fromMigrate) { - css->onUpdateOp(opCtx, - args.criteria, - args.update, - args.updatedDoc, - opTime.writeOpTime.getTimestamp(), - opTime.prePostImageOpTime.getTimestamp()); + css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc); } if (args.nss.coll() == "system.js") { @@ -347,13 +322,11 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.writeOpTime.isNull()) { + !opTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc); } - - onWriteOpCompleted( - opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime); + onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime); } auto OpObserverImpl::aboutToDelete(OperationContext* opCtx, @@ -383,10 +356,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, auto css = CollectionShardingState::get(opCtx, nss.ns()); if (!fromMigrate) { - css->onDeleteOp(opCtx, - deleteState, - opTime.writeOpTime.getTimestamp(), - opTime.prePostImageOpTime.getTimestamp()); + css->onDeleteOp(opCtx, deleteState); } if (nss.coll() == "system.js") { @@ -395,12 +365,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, DurableViewCatalog::onExternalChange(opCtx, nss); } else if (nss.ns() == FeatureCompatibilityVersion::kCollection) { FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey); - } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && - !opTime.writeOpTime.isNull()) { + } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) { SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey); } - onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime); + onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime); } void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx, |