diff options
author | Randolph Tan <randolph@10gen.com> | 2017-07-11 16:36:13 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-07-21 12:53:45 -0400 |
commit | 769ce1808686e408bf41844c106ab5ce289339ee (patch) | |
tree | 160ddbc4d179109221d1c74ee6c7739be02f54e3 /src/mongo/db | |
parent | 056e90b4fb8d6d50c011cbeb655bed3fdc0d9b0f (diff) | |
download | mongo-769ce1808686e408bf41844c106ab5ce289339ee.tar.gz |
SERVER-28912 Include session id, txnNumber and stmtId to oplog for insert
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands.h | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 33 |
3 files changed, 39 insertions, 6 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 5eb48f794f9..3f2346fe926 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -477,6 +477,8 @@ public: arg == "shardVersion" || // arg == "tracking_info" || // arg == "writeConcern" || // + arg == "lsid" || // + arg == "txnNumber" || // false; // These comments tell clang-format to keep this line-oriented. } diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 7f7ed45c080..34d10394657 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -53,6 +53,7 @@ #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_knobs.h" @@ -437,6 +438,7 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who WriteResult out; out.results.reserve(wholeOp.getDocuments().size()); + size_t stmtIdIndex = 0; size_t bytesInBatch = 0; std::vector<InsertStatement> batch; const size_t maxBatchSize = internalInsertMaxBatchSize.load(); @@ -450,9 +452,11 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who // correct order. In an ordered insert, if one of the docs ahead of us fails, we should // behave as-if we never got to this document. } else { - // TODO: SERVER-28912 get StmtId from request - batch.emplace_back(fixedDoc.getValue().isEmpty() ? doc - : std::move(fixedDoc.getValue())); + const StmtId stmtId = opCtx->getTxnNumber() + ? write_ops::getStmtIdForWriteAt(wholeOp, stmtIdIndex++) + : kUninitializedStmtId; + BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); + batch.emplace_back(stmtId, toInsert); bytesInBatch += batch.back().doc.objsize(); if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) continue; // Add more to batch before inserting. diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index c8f2389f4cd..68ee85a6364 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -245,6 +245,35 @@ Collection* getLocalOplogCollection(OperationContext* opCtx, return _localOplogCollection; } +/** + * Attaches the session information of a write to an oplog entry if it exists. + */ +void appendSessionInfo(OperationContext* opCtx, BSONObjBuilder* builder, StmtId statementId) { + auto txnNum = opCtx->getTxnNumber(); + + if (!txnNum) { + return; + } + + auto logicalSessionId = opCtx->getLogicalSessionId(); + invariant(logicalSessionId); + + // Note: certain operations, like implicit collection creation will not have a stmtId. + if (statementId == kUninitializedStmtId) { + return; + } + + Logical_session_id lsid; + lsid.setId(logicalSessionId->getId()); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(txnNum); + sessionInfo.serialize(builder); + + builder->append(OplogEntryBase::kStatementIdFieldName, statementId); +} + OplogDocWriter _logOpWriter(OperationContext* opCtx, const char* opstr, const NamespaceString& nss, @@ -280,9 +309,7 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx, b.appendDate("wall", wallTime); } - if (statementId != kUninitializedStmtId) { - // TODO: SERVER-28912 append stmtId to oplog entry - } + appendSessionInfo(opCtx, &b, statementId); return OplogDocWriter(OplogDocWriter(b.obj(), obj)); } |