1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
/**
* Be sure that an exchange won't deadlock when one of the consumer's buffers is full. Iterates two
* consumers on an Exchange with a very small buffer. This test was designed to reproduce
* SERVER-37499.
* @tags: [
* requires_sharding,
* uses_transactions,
* ]
*/
(function() {
// This test manually simulates a session, which is not compatible with implicit sessions.
TestData.disableImplicitSessions = true;
// Start a sharded cluster. For this test, we'll just need to talk to the shard directly.
const st = new ShardingTest({shards: 1, mongos: 1});
const adminDB = st.shard0.getDB("admin");
const session = st.shard0.getDB("test").getMongo().startSession();
const shardDB = session.getDatabase("test");
const coll = shardDB.exchange_in_session;
let bigString = '';
for (let i = 0; i < 20; i++) {
bigString += 's';
}
// Insert some documents.
const nDocs = 50;
for (let i = 0; i < nDocs; i++) {
assert.commandWorked(coll.insert({_id: i, bigString: bigString}));
}
session.startTransaction();
// Set up an Exchange with two cursors.
let res = assert.commandWorked(shardDB.runCommand({
aggregate: coll.getName(),
pipeline: [],
exchange: {
policy: 'keyRange',
consumers: NumberInt(2),
key: {_id: 1},
boundaries: [{a: MinKey}, {a: nDocs / 2}, {a: MaxKey}],
consumerIds: [NumberInt(0), NumberInt(1)],
bufferSize: NumberInt(128)
},
cursor: {batchSize: 0},
}));
function spawnShellToIterateCursor(cursorId) {
let code = `const cursor = ${tojson(cursorId)};`;
code += `const sessionId = ${tojson(session.getSessionId())};`;
code += `const collName = "${coll.getName()}";`;
function iterateCursorWithNoDocs() {
const getMoreCmd = {
getMore: cursor.id,
collection: collName,
batchSize: 4,
lsid: sessionId,
txnNumber: NumberLong(0),
autocommit: false
};
let resp = null;
while (!resp || resp.cursor.id != 0) {
resp = assert.commandWorked(db.runCommand(getMoreCmd));
}
}
code += `(${iterateCursorWithNoDocs.toString()})();`;
return startParallelShell(code, st.rs0.getPrimary().port);
}
let parallelShells = [];
for (let curs of res.cursors) {
parallelShells.push(spawnShellToIterateCursor(curs.cursor));
}
assert.soon(function() {
for (let waitFn of parallelShells) {
waitFn();
}
return true;
});
assert.commandWorked(session.abortTransaction_forTesting());
st.stop();
})();
|