1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
|
/**
* Wrapper around a mongobridge process. Construction of a MongoBridge instance will start a new
* mongobridge process that listens on 'options.port' and forwards messages to 'options.dest'.
*
* @param {Object} options
* @param {string} options.dest - The host:port to forward messages to.
* @param {string} [options.hostName=localhost] - The hostname to specify when connecting to the
* mongobridge process.
* @param {number} [options.port=allocatePort()] - The port number the mongobridge should listen on.
*
* @returns {Proxy} Acts as a typical connection object to options.hostName:options.port that has
* additional functions exposed to shape network traffic from other processes.
*/
function MongoBridge(options) {
'use strict';
if (!(this instanceof MongoBridge)) {
return new MongoBridge(options);
}
options = options || {};
if (!options.hasOwnProperty('dest')) {
throw new Error('Missing required field "dest"');
}
var hostName = options.hostName || 'localhost';
this.dest = options.dest;
this.port = options.port || allocatePort();
// The connection used by a test for running commands against the mongod or mongos process.
var userConn;
// copy the enableTestCommands field from TestData since this can be
// changed in the middle of a test. This is the same value that
// will ultimately be used by MongoRunner, and determines if
// *From commands can be used during the lifetime of the MongoBridge
// instance.
this._testCommandsEnabledAtInit = jsTest.options().enableTestCommands;
// A separate (hidden) connection for configuring the mongobridge process.
var controlConn;
// Start the mongobridge on port 'this.port' routing network traffic to 'this.dest'.
var args = ['mongobridge', '--port', this.port, '--dest', this.dest];
var keysToSkip = [
'dest',
'hostName',
'port',
];
// Append any command line arguments that are optional for mongobridge.
Object.keys(options).forEach(function(key) {
if (Array.contains(keysToSkip, key)) {
return;
}
var value = options[key];
if (value === null || value === undefined) {
throw new Error("Value '" + value + "' for '" + key + "' option is ambiguous; specify" +
" {flag: ''} to add --flag command line options'");
}
args.push('--' + key);
if (value !== '') {
args.push(value.toString());
}
});
var pid = _startMongoProgram.apply(null, args);
/**
* Initializes the mongo shell's connections to the mongobridge process. Throws an error if the
* mongobridge process stopped running or if a connection cannot be made.
*
* The mongod or mongos process corresponding to this mongobridge process may need to connect to
* itself through the mongobridge process, e.g. when running the _isSelf command. This means
* the mongobridge process needs to be running prior to the other process. However, to avoid
* spurious failures during situations where the mongod or mongos process is not ready to accept
* connections, connections to the mongobridge process should only be made after the other
* process is known to be reachable:
*
* var bridge = new MongoBridge(...);
* var conn = MongoRunner.runMongoXX(...);
* assert.neq(null, conn);
* bridge.connectToBridge();
*/
this.connectToBridge = function connectToBridge() {
var failedToStart = false;
assert.soon(() => {
if (!checkProgram(pid).alive) {
failedToStart = true;
return true;
}
try {
userConn = new Mongo(hostName + ':' + this.port);
} catch (e) {
return false;
}
return true;
}, 'failed to connect to the mongobridge on port ' + this.port);
assert(!failedToStart, 'mongobridge failed to start on port ' + this.port);
// The MongoRunner.runMongoXX() functions define a 'name' property on the returned
// connection object that is equivalent to its 'host' property. Certain functions in
// ReplSetTest and ShardingTest use the 'name' property instead of the 'host' property, so
// we define it here for consistency.
Object.defineProperty(userConn, 'name', {
enumerable: true,
get: function() {
return this.host;
},
});
assert.soonNoExcept(() => {
controlConn = new Mongo(hostName + ':' + this.port);
return true;
}, 'failed to make control connection to the mongobridge on port ' + this.port);
};
/**
* Terminates the mongobridge process.
*/
this.stop = function stop() {
return _stopMongoProgram(this.port);
};
// Throws an error if 'obj' is not a MongoBridge instance.
function throwErrorIfNotMongoBridgeInstance(obj) {
if (!(obj instanceof MongoBridge)) {
throw new Error('Expected MongoBridge instance, but got ' + tojson(obj));
}
}
// Runs a command intended to configure the mongobridge.
function runBridgeCommand(conn, cmdName, cmdArgs) {
// The wire version of this mongobridge is detected as the wire version of the corresponding
// mongod or mongos process because the message is simply forwarded to that process.
// Create a new Object with 'cmdName' as the first key and $forBridge=true.
var cmdObj = {};
cmdObj[cmdName] = 1;
cmdObj.$forBridge = true;
Object.extend(cmdObj, cmdArgs);
var dbName = 'test';
var noQueryOptions = 0;
return conn.runCommand(dbName, cmdObj, noQueryOptions);
}
/**
* Allows communication between 'this.dest' and the 'dest' of each of the 'bridges'.
*
* Configures 'this' bridge to accept new connections from the 'dest' of each of the 'bridges'.
* Additionally configures each of the 'bridges' to accept new connections from 'this.dest'.
*
* @param {(MongoBridge|MongoBridge[])} bridges
*/
this.reconnect = function reconnect(bridges) {
if (!Array.isArray(bridges)) {
bridges = [bridges];
}
bridges.forEach(throwErrorIfNotMongoBridgeInstance);
this.acceptConnectionsFrom(bridges);
bridges.forEach(bridge => bridge.acceptConnectionsFrom(this));
};
/**
* Disallows communication between 'this.dest' and the 'dest' of each of the 'bridges'.
*
* Configures 'this' bridge to close existing connections and reject new connections from the
* 'dest' of each of the 'bridges'. Additionally configures each of the 'bridges' to close
* existing connections and reject new connections from 'this.dest'.
*
* @param {(MongoBridge|MongoBridge[])} bridges
*/
this.disconnect = function disconnect(bridges) {
if (!Array.isArray(bridges)) {
bridges = [bridges];
}
bridges.forEach(throwErrorIfNotMongoBridgeInstance);
this.rejectConnectionsFrom(bridges);
bridges.forEach(bridge => bridge.rejectConnectionsFrom(this));
};
// All *From functions require that test commands be enabled on the mongod
// instance (which populates the hostInfo field).
function checkTestCommandsEnabled(fn_name) {
return function(bridge) {
assert(bridge._testCommandsEnabledAtInit,
"testing commands have not been enabled. " + fn_name +
" will not work as expected");
};
}
/**
* Configures 'this' bridge to accept new connections from the 'dest' of each of the 'bridges'.
*
* @param {(MongoBridge|MongoBridge[])} bridges
*/
this.acceptConnectionsFrom = function acceptConnectionsFrom(bridges) {
if (!Array.isArray(bridges)) {
bridges = [bridges];
}
bridges.forEach(throwErrorIfNotMongoBridgeInstance);
bridges.forEach(checkTestCommandsEnabled("acceptConnectionsFrom"));
bridges.forEach(bridge => {
var res = runBridgeCommand(controlConn, 'acceptConnectionsFrom', {host: bridge.dest});
assert.commandWorked(res,
'failed to configure the mongobridge listening on port ' +
this.port + ' to accept new connections from ' + bridge.dest);
});
};
/**
* Configures 'this' bridge to close existing connections and reject new connections from the
* 'dest' of each of the 'bridges'.
*
* @param {(MongoBridge|MongoBridge[])} bridges
*/
this.rejectConnectionsFrom = function rejectConnectionsFrom(bridges) {
if (!Array.isArray(bridges)) {
bridges = [bridges];
}
bridges.forEach(throwErrorIfNotMongoBridgeInstance);
bridges.forEach(checkTestCommandsEnabled("rejectConnectionsFrom"));
bridges.forEach(bridge => {
var res = runBridgeCommand(controlConn, 'rejectConnectionsFrom', {host: bridge.dest});
assert.commandWorked(res,
'failed to configure the mongobridge listening on port ' +
this.port + ' to hang up connections from ' + bridge.dest);
});
};
/**
* Configures 'this' bridge to delay forwarding requests from the 'dest' of each of the
* 'bridges' to 'this.dest' by the specified amount.
*
* @param {(MongoBridge|MongoBridge[])} bridges
* @param {number} delay - The delay to apply in milliseconds.
*/
this.delayMessagesFrom = function delayMessagesFrom(bridges, delay) {
if (!Array.isArray(bridges)) {
bridges = [bridges];
}
bridges.forEach(throwErrorIfNotMongoBridgeInstance);
bridges.forEach(checkTestCommandsEnabled("delayMessagesFrom"));
bridges.forEach(bridge => {
var res = runBridgeCommand(controlConn, 'delayMessagesFrom', {
host: bridge.dest,
delay: delay,
});
assert.commandWorked(res,
'failed to configure the mongobridge listening on port ' +
this.port + ' to delay messages from ' + bridge.dest + ' by ' +
delay + ' milliseconds');
});
};
/**
* Configures 'this' bridge to uniformly discard requests from the 'dest' of each of the
* 'bridges' to 'this.dest' with probability 'lossProbability'.
*
* @param {(MongoBridge|MongoBridge[])} bridges
* @param {number} lossProbability
*/
this.discardMessagesFrom = function discardMessagesFrom(bridges, lossProbability) {
if (!Array.isArray(bridges)) {
bridges = [bridges];
}
bridges.forEach(throwErrorIfNotMongoBridgeInstance);
bridges.forEach(checkTestCommandsEnabled("discardMessagesFrom"));
bridges.forEach(bridge => {
var res = runBridgeCommand(controlConn, 'discardMessagesFrom', {
host: bridge.dest,
loss: lossProbability,
});
assert.commandWorked(res,
'failed to configure the mongobridge listening on port ' +
this.port + ' to discard messages from ' + bridge.dest +
' with probability ' + lossProbability);
});
};
// Use a Proxy to "extend" the underlying connection object. The C++ functions, e.g.
// runCommand(), require that they are called on the Mongo instance itself and so typical
// prototypical inheritance isn't possible.
return new Proxy(this, {
get: function get(target, property, receiver) {
// If the property is defined on the MongoBridge instance itself, then
// return it.
// Otherwise, get the value of the property from the Mongo instance.
if (target.hasOwnProperty(property)) {
return target[property];
}
var value = userConn[property];
if (typeof value === 'function') {
return value.bind(userConn);
}
return value;
},
set: function set(target, property, value, receiver) {
// Delegate setting the value of any property to the Mongo instance so
// that it can be
// accessed in functions acting on the Mongo instance directly instead of
// this Proxy.
// For example, the "slaveOk" property needs to be set on the Mongo
// instance in order
// for the query options bit to be set correctly.
userConn[property] = value;
return true;
},
});
}
// The number of ports that ReplSetTest and ShardingTest should stagger the port number of the
// mongobridge process and its corresponding mongod/mongos process by. The resulting port number of
// the mongod/mongos process is MongoBridge#port + MongoBridge.kBridgeOffset.
MongoBridge.kBridgeOffset = 10;
|