1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
// Confirms that change streams correctly handle prepared transactions with an empty applyOps entry.
// This test creats a multi-shard transaction in which one of the participating shards has only a
// no-op write, resulting in the empty applyOps scenario we wish to test. Exercises the fix for
// SERVER-50769.
// @tags: [
// requires_sharding,
// uses_change_streams,
// uses_multi_shard_transaction,
// uses_transactions,
// ]
(function() {
"use strict";
const dbName = "test";
const collName = "change_stream_empty_apply_ops";
const namespace = dbName + "." + collName;
const st = new ShardingTest({
shards: 2,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
});
const mongosConn = st.s;
const db = mongosConn.getDB(dbName);
const coll = db.getCollection(collName);
assert.commandWorked(coll.createIndex({shard: 1}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);
// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
// documents and one that contains all {shard: 2} documents.
st.shardColl(collName,
{shard: 1} /* shard key */,
{shard: 2} /* split at */,
{shard: 2} /* move the chunk containing {shard: 2} to its own shard */,
dbName,
true);
// Seed each chunk with an initial document.
assert.commandWorked(coll.insert({shard: 1}, {writeConcern: {w: "majority"}}));
assert.commandWorked(coll.insert({shard: 2}, {writeConcern: {w: "majority"}}));
// Open up change streams.
const changeStreamCursorColl = coll.watch();
const changeStreamCursorDB = db.watch();
const changeStreamCursorCluster = mongosConn.watch();
// Start a transaction, which will include both shards.
const sesion = db.getMongo().startSession({causalConsistency: true});
const sessionDb = sesion.getDatabase(dbName);
const sessionColl = sessionDb[collName];
sesion.startTransaction({readConcern: {level: "majority"}});
// This no-op will make one of the shards a transaction participant without generating an actual
// write. The transaction will send an empty prepared transaction to the shard, in the form of an
// applyOps command with no operations.
sessionColl.findAndModify({query: {shard: 1}, update: {$setOnInsert: {a: 1}}});
// This write, which is not a no-op, occurs on the other shard.
sessionColl.findAndModify({query: {shard: 2}, update: {$set: {a: 1}}});
assert.commandWorked(sesion.commitTransaction_forTesting());
// Each change stream should see exactly one update, resulting from the valid write on shard 2.
[changeStreamCursorColl, changeStreamCursorDB, changeStreamCursorCluster].forEach(function(
changeStreamCursor) {
assert.soon(() => changeStreamCursor.hasNext());
const changeDoc = changeStreamCursor.next();
assert.eq(changeDoc.documentKey.shard, 2);
assert.eq(changeDoc.operationType, "update");
assert(!changeStreamCursor.hasNext());
});
st.stop();
})();
|