summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-07-11 16:36:13 -0400
committerRandolph Tan <randolph@10gen.com>2017-07-21 12:53:45 -0400
commit769ce1808686e408bf41844c106ab5ce289339ee (patch)
tree160ddbc4d179109221d1c74ee6c7739be02f54e3 /src/mongo/db
parent056e90b4fb8d6d50c011cbeb655bed3fdc0d9b0f (diff)
downloadmongo-769ce1808686e408bf41844c106ab5ce289339ee.tar.gz
SERVER-28912 Include session id, txnNumber and stmtId to oplog for insert
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands.h2
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp10
-rw-r--r--src/mongo/db/repl/oplog.cpp33
3 files changed, 39 insertions, 6 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 5eb48f794f9..3f2346fe926 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -477,6 +477,8 @@ public:
arg == "shardVersion" || //
arg == "tracking_info" || //
arg == "writeConcern" || //
+ arg == "lsid" || //
+ arg == "txnNumber" || //
false; // These comments tell clang-format to keep this line-oriented.
}
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 7f7ed45c080..34d10394657 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_knobs.h"
@@ -437,6 +438,7 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
WriteResult out;
out.results.reserve(wholeOp.getDocuments().size());
+ size_t stmtIdIndex = 0;
size_t bytesInBatch = 0;
std::vector<InsertStatement> batch;
const size_t maxBatchSize = internalInsertMaxBatchSize.load();
@@ -450,9 +452,11 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
// correct order. In an ordered insert, if one of the docs ahead of us fails, we should
// behave as-if we never got to this document.
} else {
- // TODO: SERVER-28912 get StmtId from request
- batch.emplace_back(fixedDoc.getValue().isEmpty() ? doc
- : std::move(fixedDoc.getValue()));
+ const StmtId stmtId = opCtx->getTxnNumber()
+ ? write_ops::getStmtIdForWriteAt(wholeOp, stmtIdIndex++)
+ : kUninitializedStmtId;
+ BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
+ batch.emplace_back(stmtId, toInsert);
bytesInBatch += batch.back().doc.objsize();
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
continue; // Add more to batch before inserting.
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index c8f2389f4cd..68ee85a6364 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -245,6 +245,35 @@ Collection* getLocalOplogCollection(OperationContext* opCtx,
return _localOplogCollection;
}
+/**
+ * Attaches the session information of a write to an oplog entry if it exists.
+ */
+void appendSessionInfo(OperationContext* opCtx, BSONObjBuilder* builder, StmtId statementId) {
+ auto txnNum = opCtx->getTxnNumber();
+
+ if (!txnNum) {
+ return;
+ }
+
+ auto logicalSessionId = opCtx->getLogicalSessionId();
+ invariant(logicalSessionId);
+
+ // Note: certain operations, like implicit collection creation will not have a stmtId.
+ if (statementId == kUninitializedStmtId) {
+ return;
+ }
+
+ Logical_session_id lsid;
+ lsid.setId(logicalSessionId->getId());
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(txnNum);
+ sessionInfo.serialize(builder);
+
+ builder->append(OplogEntryBase::kStatementIdFieldName, statementId);
+}
+
OplogDocWriter _logOpWriter(OperationContext* opCtx,
const char* opstr,
const NamespaceString& nss,
@@ -280,9 +309,7 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
b.appendDate("wall", wallTime);
}
- if (statementId != kUninitializedStmtId) {
- // TODO: SERVER-28912 append stmtId to oplog entry
- }
+ appendSessionInfo(opCtx, &b, statementId);
return OplogDocWriter(OplogDocWriter(b.obj(), obj));
}