1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
/**
* Tests that coordinateCommitTransaction returns the decision once the decision has been written
* with the client's writeConcern.
* @tags: [uses_transactions, uses_multi_shard_transaction]
*/
(function() {
'use strict';
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/write_concern_util.js");
load("jstests/sharding/libs/sharded_transactions_helpers.js");
const st = new ShardingTest({
mongos: 1,
shards: 2,
rs: {
// Set priority of secondaries to 0 so that the primary does not change during each
// testcase.
nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
// Disallow chaining to force both secondaries to sync from the primary. The testcase for
// writeConcern "majority" disables replication on one of the secondaries, with chaining
// that would effectively disable replication on both secondaries, causing the testcase to
// to fail since writeConcern is unsatsifiable.
settings: {chainingAllowed: false}
},
causallyConsistent: true
});
enableCoordinateCommitReturnImmediatelyAfterPersistingDecision(st);
const kDbName = jsTest.name();
const kCollName = "test";
const kNs = kDbName + "." + kCollName;
const lsid = {
id: UUID()
};
let txnNumber = 0;
assert.commandWorked(st.s.adminCommand({enableSharding: kDbName}));
// The default WC is majority and stopServerReplication will prevent satisfying any majority writes.
assert.commandWorked(st.s.adminCommand(
{setDefaultRWConcern: 1, defaultWriteConcern: {w: 1}, writeConcern: {w: "majority"}}));
st.ensurePrimaryShard(kDbName, st.shard0.shardName);
assert.commandWorked(st.s.adminCommand({shardCollection: kNs, key: {x: 1}}));
// Make both shards have chunks for the collection so that two-phase commit is required.
assert.commandWorked(st.s.adminCommand({split: kNs, middle: {x: 0}}));
assert.commandWorked(st.s.adminCommand({moveChunk: kNs, find: {x: 0}, to: st.shard1.shardName}));
// Do an insert to force a refresh so the transaction doesn't fail due to StaleConfig.
assert.commandWorked(st.s.getCollection(kNs).insert({x: 0}));
/*
* Runs commitTransaction on the mongos in a parallel shell, and asserts that it works.
*/
function runCommitThroughMongosInParallelShellExpectSuccess(writeConcern) {
const runCommitExpectSuccessCode = "assert.commandWorked(db.adminCommand({" +
"commitTransaction: 1," +
"lsid: " + tojson(lsid) + "," +
"txnNumber: NumberLong(" + txnNumber + ")," +
"stmtId: NumberInt(0)," +
"autocommit: false," +
"writeConcern: " + tojson(writeConcern) + "}));";
return startParallelShell(runCommitExpectSuccessCode, st.s.port);
}
/*
* Runs a transaction to inserts the given docs.
*/
function runInsertCmdInTxn(docs) {
assert.commandWorked(st.s.getDB(kDbName).runCommand({
insert: kCollName,
documents: docs,
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(0),
startTransaction: true,
autocommit: false,
}));
}
/*
* Returns the 'decision' inside the coordinator doc with the given 'lsid' and 'txnNumber'
* on this connection. Returns null if the coordinator doc does not exist or does not have
* the 'decision' field.
*/
function getDecision(nodeConn, lsid, txnNumber) {
const coordDoc = nodeConn.getCollection("config.transaction_coordinators")
.findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber});
return coordDoc ? coordDoc.decision : null;
}
/*
* Returns true if the given 'decision' represents a commit decision.
*/
function isCommitDecision(decision) {
return decision.decision === "commit" && decision.commitTimestamp !== null;
}
/*
* Returns the number of coordinator replica set nodes that have written the commit decision
* to the config.transactions collection.
*/
function getNumNodesWithCommitDecision(coordinatorRs) {
const decision = getDecision(st.rs0.getPrimary(), lsid, txnNumber);
assert(isCommitDecision(decision));
let numNodes = 1;
for (const node of st.rs0.getSecondaries()) {
const secDecision = getDecision(node, lsid, txnNumber);
if (secDecision) {
assert.eq(0, bsonWoCompare(secDecision, decision));
numNodes++;
}
}
return numNodes;
}
/*
* Asserts that the coordinator doc has been replicated to the given number of nodes.
*/
function assertDecisionCommittedOnNodes(coordinatorRs, numNodes) {
assert.eq(getNumNodesWithCommitDecision(coordinatorRs), numNodes);
}
/*
* Asserts that the coordinator doc has been majority replicated.
*/
function assertDecisionMajorityCommitted(coordinatorRs, numNodes) {
assert.gte(getNumNodesWithCommitDecision(coordinatorRs), coordinatorRs.nodes.length / 2);
}
/*
* Returns an array of nodes that we can stop replication on and still allow writes on
* the replica set to satsify the given write concern.
*/
function getNodesToStopReplication(rs, writeConcern) {
if (writeConcern.w == "majority") {
return rs.getSecondaries().slice(0, rs.nodes.length / 2);
}
return rs.getSecondaries().slice(0, rs.nodes.length - writeConcern.w);
}
function testCommitDecisionWriteConcern(writeConcern) {
jsTest.log(`Testing commitTransaction with writeConcern ${tojson(writeConcern)}`);
// Start a transaction that inserts documents.
const x = txnNumber + 1;
const docs = [{x: -x}, {x: x}];
runInsertCmdInTxn(docs);
// Turn on the failpoint to pause coordinateCommit right before the coordinator persists
// the decision so we can disable replication on the nodes that are not needed for satifying
// the write concern.
let persistDecisionFailPoint = configureFailPoint(st.shard0, "hangBeforeWritingDecision");
const nodesToStopReplication = getNodesToStopReplication(st.rs0, writeConcern);
// Turn on the failpoint to pause coordinateCommit right before the coordinator deletes
// its coordinator doc so we can check the doc has been majority committed before it gets
// deleted.
let deleteCoordDocFailPoint = configureFailPoint(st.shard0, "hangBeforeDeletingCoordinatorDoc");
// Run commitTransaction with the given writeConcern. Disable replication on necessary nodes
// right before it persists the decision.
let awaitResult = runCommitThroughMongosInParallelShellExpectSuccess(writeConcern);
persistDecisionFailPoint.wait();
if (nodesToStopReplication.length > 0) {
stopServerReplication(nodesToStopReplication);
}
persistDecisionFailPoint.off();
jsTest.log(
`Verify that commitTransaction returns once the decision is written with client's writeConcern ${
tojson(writeConcern)}`);
awaitResult();
assertDecisionCommittedOnNodes(st.rs0, st.rs0.nodes.length - nodesToStopReplication.length);
jsTest.log(
"Verify that the coordinator doc is majority committed regardless of the client's writeConcern");
// Re-enable replication to allow the decision to be majority committed and two-phase
// commit to finish.
if (nodesToStopReplication.length > 0) {
restartServerReplication(nodesToStopReplication);
}
deleteCoordDocFailPoint.wait();
assertDecisionMajorityCommitted(st.rs0);
deleteCoordDocFailPoint.off();
jsTest.log("Verify the insert operation was committed successfully");
let res = assert.commandWorked(
st.s.getDB(kDbName).runCommand({find: kCollName, filter: {$or: docs}, lsid: lsid}));
assert.eq(2, res.cursor.firstBatch.length);
txnNumber++;
}
testCommitDecisionWriteConcern({w: 1});
testCommitDecisionWriteConcern({w: "majority"});
testCommitDecisionWriteConcern({w: 3});
st.stop();
})();
|