summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--jstests/auth/commands_builtin_roles.js14
-rw-r--r--jstests/auth/lib/commands_lib.js24
-rw-r--r--jstests/core/views/views_all_commands.js2
-rw-r--r--jstests/noPassthrough/traffic_reading.js81
-rw-r--r--jstests/noPassthrough/traffic_reading_legacy.js72
-rw-r--r--jstests/noPassthrough/traffic_recording.js126
-rw-r--r--jstests/sharding/database_and_shard_versioning_all_commands.js2
-rw-r--r--jstests/sharding/libs/last_stable_mongos_commands.js10
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js2
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js2
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js2
-rw-r--r--src/mongo/SConscript19
-rw-r--r--src/mongo/db/SConscript30
-rw-r--r--src/mongo/db/auth/action_types.txt1
-rw-r--r--src/mongo/db/auth/role_graph_builtin_roles.cpp3
-rw-r--r--src/mongo/db/commands/SConscript2
-rw-r--r--src/mongo/db/commands/traffic_recording_cmds.cpp120
-rw-r--r--src/mongo/db/traffic_reader.cpp252
-rw-r--r--src/mongo/db/traffic_reader.h42
-rw-r--r--src/mongo/db/traffic_reader_main.cpp133
-rw-r--r--src/mongo/db/traffic_recorder.cpp416
-rw-r--r--src/mongo/db/traffic_recorder.h79
-rw-r--r--src/mongo/db/traffic_recorder.idl77
-rw-r--r--src/mongo/shell/shell_utils_launcher.cpp10
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/service_state_machine.cpp8
27 files changed, 1525 insertions, 6 deletions
diff --git a/.gitignore b/.gitignore
index d340be76732..4d4000c73d4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -83,6 +83,7 @@ scratch
/mongos
/mongoshim
/mongosniff
+/mongotrafficreader
/wt
*.tgz
diff --git a/jstests/auth/commands_builtin_roles.js b/jstests/auth/commands_builtin_roles.js
index f309435d6d6..32674cd8a41 100644
--- a/jstests/auth/commands_builtin_roles.js
+++ b/jstests/auth/commands_builtin_roles.js
@@ -143,7 +143,13 @@ function checkForNonExistentRoles() {
}
}
-var opts = {auth: "", enableExperimentalStorageDetailsCmd: ""};
+const dbPath = MongoRunner.toRealDir("$dataDir/commands_built_in_roles/");
+mkdir(dbPath);
+var opts = {
+ auth: "",
+ enableExperimentalStorageDetailsCmd: "",
+ setParameter: "trafficRecordingDirectory=" + dbPath
+};
var impls = {createUsers: createUsers, runOneTest: runOneTest};
checkForNonExistentRoles();
@@ -159,7 +165,11 @@ conn = new ShardingTest({
shards: 2,
mongos: 1,
keyFile: "jstests/libs/key1",
- other: {shardOptions: opts, shardAsReplicaSet: false}
+ other: {
+ shardOptions: opts,
+ shardAsReplicaSet: false,
+ mongosOptions: {setParameter: "trafficRecordingDirectory=" + dbPath}
+ }
});
authCommandsLib.runTests(conn, impls);
conn.stop();
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 6899d0646aa..99b6d9a6924 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -5972,7 +5972,29 @@ var authCommandsLib = {
{killCursors: "$cmd.aggregate", cursors: [response.cursor.id]}));
}
}
- }
+ },
+ {
+ testname: "startRecordingTraffic",
+ command: {startRecordingTraffic: 1, filename: "notARealPath"},
+ testcases: [
+ {runOnDb: adminDbName, roles: roles_hostManager},
+ ],
+ teardown: (db, response) => {
+ if (response.ok) {
+ assert.commandWorked(db.runCommand({stopRecordingTraffic: 1}));
+ }
+ }
+ },
+ {
+ testname: "stopRecordingTraffic",
+ command: {stopRecordingTraffic: 1},
+ testcases: [
+ {runOnDb: adminDbName, roles: roles_hostManager},
+ ],
+ setup: function(db) {
+ db.runCommand({startRecordingTraffic: 1, filename: "notARealPath"});
+ }
+ },
],
/************* SHARED TEST LOGIC ****************/
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index c5bafd7671b..21bc90b7138 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -514,7 +514,9 @@
expectFailure: true,
},
stageDebug: {skip: isAnInternalCommand},
+ startRecordingTraffic: {skip: isUnrelated},
startSession: {skip: isAnInternalCommand},
+ stopRecordingTraffic: {skip: isUnrelated},
top: {skip: "tested in views/views_stats.js"},
touch: {
command: {touch: "view", data: true},
diff --git a/jstests/noPassthrough/traffic_reading.js b/jstests/noPassthrough/traffic_reading.js
new file mode 100644
index 00000000000..18642f03be4
--- /dev/null
+++ b/jstests/noPassthrough/traffic_reading.js
@@ -0,0 +1,81 @@
+// tests for the traffic_recording commands.
+(function() {
+ // Variables for this test
+ const recordingDir = MongoRunner.toRealDir("$dataDir/traffic_recording/");
+ const recordingFile = "recording.txt";
+ const recordingFilePath = MongoRunner.toRealDir(recordingDir + "/" + recordingFile);
+ const replayFilePath = MongoRunner.toRealDir(recordingDir + "/replay.txt");
+
+ // Create the recording directory if it does not already exist
+ mkdir(recordingDir);
+
+ // Create the options and run mongod
+ var opts = {auth: "", setParameter: "trafficRecordingDirectory=" + recordingDir};
+ m = MongoRunner.runMongod(opts);
+
+ // Get the port of the host
+ var serverPort = m.port;
+
+ // Create necessary users
+ adminDB = m.getDB("admin");
+ const testDB = m.getDB("test");
+ const coll = testDB.getCollection("foo");
+ adminDB.createUser({user: "admin", pwd: "pass", roles: jsTest.adminUserRoles});
+ adminDB.auth("admin", "pass");
+
+ // Start recording traffic
+ assert.commandWorked(
+ adminDB.runCommand({'startRecordingTraffic': 1, 'filename': 'recording.txt'}));
+
+ // Run a few commands
+ assert.commandWorked(testDB.runCommand({"serverStatus": 1}));
+ assert.commandWorked(coll.insert({"name": "foo biz bar"}));
+ assert.eq("foo biz bar", coll.findOne().name);
+ assert.commandWorked(coll.insert({"name": "foo bar"}));
+ assert.eq("foo bar", coll.findOne({"name": "foo bar"}).name);
+ assert.commandWorked(coll.deleteOne({}));
+ assert.eq(1, coll.aggregate().toArray().length);
+ assert.commandWorked(coll.update({}, {}));
+
+ // Stop recording traffic
+ assert.commandWorked(testDB.runCommand({'stopRecordingTraffic': 1}));
+
+ // Shutdown Mongod
+ MongoRunner.stopMongod(m, null, {user: 'admin', pwd: 'password'});
+
+ // Counters
+ var numRequest = 0;
+ var numResponse = 0;
+ var opTypes = {};
+
+ // Pass filepath to traffic_reader helper method to get recorded info in BSON
+ var res = convertTrafficRecordingToBSON(recordingFilePath);
+
+ // Iterate through the results and assert the above commands are properly recorded
+ res.forEach((obj) => {
+ assert.eq(obj["rawop"]["header"]["opcode"], 2013);
+ assert.eq(obj["seenconnectionnum"], 1);
+ var responseTo = obj["rawop"]["header"]["responseto"];
+ if (responseTo == 0) {
+ assert.eq(obj["destendpoint"], serverPort.toString());
+ numRequest++;
+ } else {
+ assert.eq(obj["srcendpoint"], serverPort.toString());
+ numResponse++;
+ }
+ opTypes[obj["opType"]] = (opTypes[obj["opType"]] || 0) + 1;
+ });
+
+ // Assert there is a response for every request
+ assert.eq(numResponse, numRequest);
+
+ // Assert the opTypes were correct
+ assert.eq(opTypes['isMaster'], opTypes["ismaster"]);
+ assert.eq(opTypes['find'], 2);
+ assert.eq(opTypes['insert'], 2);
+ assert.eq(opTypes['delete'], 1);
+ assert.eq(opTypes['update'], 1);
+ assert.eq(opTypes['aggregate'], 1);
+ assert.eq(opTypes['stopRecordingTraffic'], 1);
+
+})();
diff --git a/jstests/noPassthrough/traffic_reading_legacy.js b/jstests/noPassthrough/traffic_reading_legacy.js
new file mode 100644
index 00000000000..9224edf926a
--- /dev/null
+++ b/jstests/noPassthrough/traffic_reading_legacy.js
@@ -0,0 +1,72 @@
+// tests for the traffic_recording commands.
+(function() {
+ var baseName = "jstests_traffic_recording";
+
+ // Variables for this test
+ const recordingDir = MongoRunner.toRealDir("$dataDir/traffic_recording/");
+ const recordingFile = "recording.txt";
+ const recordingFilePath = MongoRunner.toRealDir(recordingDir + "/" + recordingFile);
+
+ // Create the recording directory if it does not already exist
+ mkdir(recordingDir);
+
+ // Create the options and run mongod
+ var opts = {auth: "", setParameter: "trafficRecordingDirectory=" + recordingDir};
+ m = MongoRunner.runMongod(opts);
+
+ // Get the port of the host
+ var serverPort = m.port;
+
+ // Set the readMode and writeMode to legacy
+ m.forceReadMode("legacy");
+ m.forceWriteMode("legacy");
+
+ // Create necessary users
+ adminDB = m.getDB("admin");
+ const testDB = m.getDB("test");
+ const coll = testDB.getCollection("foo");
+ adminDB.createUser({user: "admin", pwd: "pass", roles: jsTest.adminUserRoles});
+ adminDB.auth("admin", "pass");
+
+ // Start recording traffic
+ assert.commandWorked(
+ adminDB.runCommand({'startRecordingTraffic': 1, 'filename': 'recording.txt'}));
+
+ // Run a few commands
+ testDB.runCommand({"serverStatus": 1});
+ coll.insert({"name": "foo biz bar"});
+ coll.findOne();
+ coll.insert({"name": "foo bar"});
+ coll.findOne({"name": "foo bar"});
+ coll.deleteOne({});
+
+ // Stop recording traffic
+ assert.commandWorked(testDB.runCommand({'stopRecordingTraffic': 1}));
+
+ // Shutdown Mongod
+ MongoRunner.stopMongod(m, null, {user: 'admin', pwd: 'password'});
+
+ // Counters
+ var opCodes = {};
+
+ // Pass filepath to traffic_reader helper method to get recorded info in BSON
+ var res = convertTrafficRecordingToBSON(recordingFilePath);
+
+ // Iterate through the results and assert the above commands are properly recorded
+ res.forEach((obj) => {
+ opCodes[obj["rawop"]["header"]["opcode"]] =
+ (opCodes[obj["rawop"]["header"]["opcode"]] || 0) + 1;
+ assert.eq(obj["seenconnectionnum"], 1);
+ var responseTo = obj["rawop"]["header"]["responseto"];
+ if (responseTo == 0) {
+ assert.eq(obj["destendpoint"], serverPort.toString());
+ } else {
+ assert.eq(obj["srcendpoint"], serverPort.toString());
+ }
+ });
+
+ // ensure legacy operations worked properly
+ assert.eq(opCodes[2002], 2);
+ assert.eq(opCodes[2006], 1);
+
+})();
diff --git a/jstests/noPassthrough/traffic_recording.js b/jstests/noPassthrough/traffic_recording.js
new file mode 100644
index 00000000000..03828809a81
--- /dev/null
+++ b/jstests/noPassthrough/traffic_recording.js
@@ -0,0 +1,126 @@
+// tests for the traffic_recording commands.
+(function() {
+ function getDB(client) {
+ let db = client.getDB("admin");
+ db.auth("admin", "pass");
+
+ return db;
+ }
+
+ function runTest(client, restartCommand) {
+ let db = getDB(client);
+
+ let res = db.runCommand({'startRecordingTraffic': 1, 'filename': 'notARealPath'});
+ assert.eq(res.ok, false);
+ assert.eq(res["errmsg"], "Traffic recording directory not set");
+
+ const path = MongoRunner.toRealDir("$dataDir/traffic_recording/");
+ mkdir(path);
+
+ if (!jsTest.isMongos(client)) {
+ setJsTestOption("enableTestCommands", 0);
+ client = restartCommand({
+ trafficRecordingDirectory: path,
+ AlwaysRecordTraffic: "notARealPath",
+ enableTestCommands: 0,
+ });
+ setJsTestOption("enableTestCommands", 1);
+ assert.eq(null, client, "AlwaysRecordTraffic and not enableTestCommands should fail");
+ }
+
+ client = restartCommand({
+ trafficRecordingDirectory: path,
+ AlwaysRecordTraffic: "notARealPath",
+ enableTestCommands: 1
+ });
+ assert.neq(null, client, "AlwaysRecordTraffic and with enableTestCommands should suceed");
+ db = getDB(client);
+
+ assert(db.runCommand({"serverStatus": 1}).trafficRecording.running);
+
+ client = restartCommand({trafficRecordingDirectory: path});
+ db = getDB(client);
+
+ res = db.runCommand({'startRecordingTraffic': 1, 'filename': 'notARealPath'});
+ assert.eq(res.ok, true);
+
+ // Running the command again should fail
+ res = db.runCommand({'startRecordingTraffic': 1, 'filename': 'notARealPath'});
+ assert.eq(res.ok, false);
+ assert.eq(res["errmsg"], "Traffic recording already active");
+
+ // Running the serverStatus command should return the relevant information
+ res = db.runCommand({"serverStatus": 1});
+ assert("trafficRecording" in res);
+ let trafficStats = res["trafficRecording"];
+ assert.eq(trafficStats["running"], true);
+
+ // Assert that the current file size is growing
+ res = db.runCommand({"serverStatus": 1});
+ assert("trafficRecording" in res);
+ let trafficStats2 = res["trafficRecording"];
+ assert.eq(trafficStats2["running"], true);
+ assert(trafficStats2["currentFileSize"] >= trafficStats["currentFileSize"]);
+
+ // Running the stopRecordingTraffic command should succeed
+ res = db.runCommand({'stopRecordingTraffic': 1});
+ assert.eq(res.ok, true);
+
+ // Running the stopRecordingTraffic command again should fail
+ res = db.runCommand({'stopRecordingTraffic': 1});
+ assert.eq(res.ok, false);
+ assert.eq(res["errmsg"], "Traffic recording not active");
+
+ // Running the serverStatus command should return running is false
+ res = db.runCommand({"serverStatus": 1});
+ assert("trafficRecording" in res);
+ trafficStats = res["trafficRecording"];
+ assert.eq(trafficStats["running"], false);
+
+ return client;
+ }
+
+ {
+ let m = MongoRunner.runMongod({auth: ""});
+
+ let db = m.getDB("admin");
+
+ db.createUser({user: "admin", pwd: "pass", roles: jsTest.adminUserRoles});
+ db.auth("admin", "pass");
+
+ m = runTest(m, function(setParams) {
+ if (m) {
+ MongoRunner.stopMongod(m, null, {user: 'admin', pwd: 'pass'});
+ }
+ m = MongoRunner.runMongod({auth: "", setParameter: setParams});
+
+ if (m) {
+ m.getDB("admin").createUser(
+ {user: "admin", pwd: "pass", roles: jsTest.adminUserRoles});
+ }
+
+ return m;
+ });
+
+ MongoRunner.stopMongod(m, null, {user: 'admin', pwd: 'pass'});
+ }
+
+ {
+ let shardTest = new ShardingTest({
+ config: 1,
+ mongos: 1,
+ shards: 0,
+ });
+
+ runTest(shardTest.s, function(setParams) {
+ shardTest.restartMongos(0, {
+ restart: true,
+ setParameter: setParams,
+ });
+
+ return shardTest.s;
+ });
+
+ shardTest.stop();
+ }
+})();
diff --git a/jstests/sharding/database_and_shard_versioning_all_commands.js b/jstests/sharding/database_and_shard_versioning_all_commands.js
index 9dff71e3862..0c680da3df3 100644
--- a/jstests/sharding/database_and_shard_versioning_all_commands.js
+++ b/jstests/sharding/database_and_shard_versioning_all_commands.js
@@ -412,7 +412,9 @@
shutdown: {skip: "does not forward command to primary shard"},
split: {skip: "does not forward command to primary shard"},
splitVector: {skip: "does not forward command to primary shard"},
+ startRecordingTraffic: {skip: "executes locally on mongos (not sent to any remote node)"},
startSession: {skip: "executes locally on mongos (not sent to any remote node)"},
+ stopRecordingTraffic: {skip: "executes locally on mongos (not sent to any remote node)"},
update: {
skipProfilerCheck: true,
sendsDbVersion: false,
diff --git a/jstests/sharding/libs/last_stable_mongos_commands.js b/jstests/sharding/libs/last_stable_mongos_commands.js
index fdd1bc0fa4a..e9470a1794a 100644
--- a/jstests/sharding/libs/last_stable_mongos_commands.js
+++ b/jstests/sharding/libs/last_stable_mongos_commands.js
@@ -15,5 +15,11 @@ const commandsRemovedFromMongosIn42 = [
// These commands were added in mongos 4.2, so will not appear in the listCommands output of a 4.0
// mongos. We will allow these commands to have a test defined without always existing on the mongos
// being used.
-const commandsAddedToMongosIn42 =
- ['abortTransaction', 'commitTransaction', 'dropConnections', 'setIndexCommitQuorum'];
+const commandsAddedToMongosIn42 = [
+ 'abortTransaction',
+ 'commitTransaction',
+ 'dropConnections',
+ 'setIndexCommitQuorum',
+ 'startRecordingTraffic',
+ 'stopRecordingTraffic',
+];
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index e54deea9f30..814b79b6308 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -289,7 +289,9 @@
splitChunk: {skip: "primary only"},
splitVector: {skip: "primary only"},
stageDebug: {skip: "primary only"},
+ startRecordingTraffic: {skip: "does not return user data"},
startSession: {skip: "does not return user data"},
+ stopRecordingTraffic: {skip: "does not return user data"},
top: {skip: "does not return user data"},
touch: {skip: "does not return user data"},
unsetSharding: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 2941787dc71..7f7e8fff3f1 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -324,7 +324,9 @@
splitChunk: {skip: "primary only"},
splitVector: {skip: "primary only"},
stageDebug: {skip: "primary only"},
+ startRecordingTraffic: {skip: "does not return user data"},
startSession: {skip: "does not return user data"},
+ stopRecordingTraffic: {skip: "does not return user data"},
top: {skip: "does not return user data"},
touch: {skip: "does not return user data"},
unsetSharding: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index e3be4c90f7c..4ce841f18f9 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -294,7 +294,9 @@
splitChunk: {skip: "primary only"},
splitVector: {skip: "primary only"},
stageDebug: {skip: "primary only"},
+ startRecordingTraffic: {skip: "does not return user data"},
startSession: {skip: "does not return user data"},
+ stopRecordingTraffic: {skip: "does not return user data"},
top: {skip: "does not return user data"},
touch: {skip: "does not return user data"},
unsetSharding: {skip: "does not return user data"},
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index ea3da91549a..ea7cb46d148 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -349,6 +349,7 @@ mongod = env.Program(
'db/introspect',
'db/keys_collection_client_direct',
'db/kill_sessions_local',
+ 'db/traffic_recorder',
'db/logical_session_cache_factory_mongod',
'db/logical_time_metadata_hook',
'db/matcher/expressions_mongod_only',
@@ -451,6 +452,23 @@ if env.TargetOSIs('windows'):
env.Alias('generated-sources', generatedServerManifest)
env.Depends("s/server.res", generatedServerManifest)
+
+mongotrafficreader = env.Program(
+ target="mongotrafficreader",
+ source=[
+ "db/traffic_reader_main.cpp"
+ ],
+ LIBDEPS=[
+ 'base',
+ 'db/traffic_reader',
+ 'rpc/protocol',
+ 'util/signal_handlers'
+ ],
+)
+
+if not hygienic:
+ env.Install('#/', mongotrafficreader)
+
# mongos
mongos = env.Program(
target='mongos',
@@ -533,6 +551,7 @@ if not has_option('noshell') and usemozjs:
'db/query/command_request_response',
'db/query/query_request',
'db/server_options_core',
+ 'db/traffic_reader',
'linenoise_utf8',
'rpc/protocol',
'scripting/scripting',
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 5fd17fe7ee3..ed2e69d5a79 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2067,3 +2067,33 @@ env.CppIntegrationTest(
'$BUILD_DIR/mongo/util/version_impl',
],
)
+
+env.Library(
+ target='traffic_recorder',
+ source=[
+ 'traffic_recorder.cpp',
+ env.Idlc('traffic_recorder.idl')[0],
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/service_context',
+ "$BUILD_DIR/mongo/rpc/rpc",
+ "$BUILD_DIR/mongo/db/commands/server_status",
+ ],
+)
+
+env.Library(
+ target='traffic_reader',
+ source=[
+ "traffic_reader.cpp",
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/rpc/protocol',
+ "$BUILD_DIR/mongo/rpc/rpc",
+ ],
+)
diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt
index 303022bf3a9..8ae39298cff 100644
--- a/src/mongo/db/auth/action_types.txt
+++ b/src/mongo/db/auth/action_types.txt
@@ -115,6 +115,7 @@
"storageDetails",
"top",
"touch",
+"trafficRecord",
"unlock",
"useUUID",
"update",
diff --git a/src/mongo/db/auth/role_graph_builtin_roles.cpp b/src/mongo/db/auth/role_graph_builtin_roles.cpp
index e9506a5f07c..7a6865f3308 100644
--- a/src/mongo/db/auth/role_graph_builtin_roles.cpp
+++ b/src/mongo/db/auth/role_graph_builtin_roles.cpp
@@ -222,7 +222,8 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) {
<< ActionType::killAnySession
<< ActionType::killop
<< ActionType::replSetResizeOplog
- << ActionType::resync; // clusterManager gets this also
+ << ActionType::resync // clusterManager gets this also
+ << ActionType::trafficRecord;
// hostManager role actions that target the database resource
hostManagerRoleDatabaseActions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 58aa4e3da35..be6a0b674ef 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -141,6 +141,7 @@ env.Library(
"mr_common.cpp",
"reap_logical_session_cache_now.cpp",
"refresh_sessions_command_internal.cpp",
+ "traffic_recording_cmds.cpp",
"user_management_commands_common.cpp",
],
LIBDEPS_PRIVATE=[
@@ -159,6 +160,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/executor/egress_tag_closer_manager',
+ '$BUILD_DIR/mongo/db/traffic_recorder',
'$BUILD_DIR/mongo/executor/task_executor_pool',
'$BUILD_DIR/mongo/rpc/client_metadata',
'$BUILD_DIR/mongo/s/sharding_legacy_api',
diff --git a/src/mongo/db/commands/traffic_recording_cmds.cpp b/src/mongo/db/commands/traffic_recording_cmds.cpp
new file mode 100644
index 00000000000..7fd2894c9f6
--- /dev/null
+++ b/src/mongo/db/commands/traffic_recording_cmds.cpp
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/action_set.h"
+#include "mongo/db/auth/action_type.h"
+#include "mongo/db/auth/authorization_manager.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/traffic_recorder.h"
+#include "mongo/db/traffic_recorder_gen.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class StartRecordingCommand final : public TypedCommand<StartRecordingCommand> {
+public:
+ using Request = StartRecordingTraffic;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ TrafficRecorder::get(opCtx->getServiceContext()).start(request());
+ log() << "** Warning: The recording file contains unencrypted user traffic."
+ << " We recommend that you limit retention of this file and "
+ << "store it on an encrypted filesystem volume.";
+ }
+
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
+ ActionType::trafficRecord}));
+ }
+
+ NamespaceString ns() const override {
+ return NamespaceString(request().getDbName(), "");
+ }
+ };
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kAlways;
+ }
+} startRecordingTrafficCommand;
+
+class StopRecordingCommand final : public TypedCommand<StopRecordingCommand> {
+public:
+ using Request = StopRecordingTraffic;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ TrafficRecorder::get(opCtx->getServiceContext()).stop();
+ }
+
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
+ ActionType::trafficRecord}));
+ }
+
+ NamespaceString ns() const override {
+ return NamespaceString(request().getDbName(), "");
+ }
+ };
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kAlways;
+ }
+} stopRecordingTrafficCommand;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/traffic_reader.cpp b/src/mongo/db/traffic_reader.cpp
new file mode 100644
index 00000000000..1198f6c11b3
--- /dev/null
+++ b/src/mongo/db/traffic_reader.cpp
@@ -0,0 +1,252 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <fcntl.h>
+#include <iostream>
+#include <string>
+#include <sys/types.h>
+#include <vector>
+
+#ifdef _WIN32
+#include <io.h>
+#else
+#include <unistd.h>
+#endif
+
+#include "mongo/base/data_cursor.h"
+#include "mongo/base/data_range_cursor.h"
+#include "mongo/base/data_type_endian.h"
+#include "mongo/base/data_type_validated.h"
+#include "mongo/base/data_view.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/traffic_reader.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/message.h"
+#include "mongo/rpc/op_msg.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/errno_util.h"
+#include "mongo/util/scopeguard.h"
+#include "mongo/util/time_support.h"
+
+namespace {
+// Taken from src/mongo/gotools/mongoreplay/util.go
+// Time.Unix() returns the number of seconds from the unix epoch but time's
+// underlying struct stores 'sec' as the number of seconds elapsed since
+// January 1, year 1 00:00:00 UTC (In the Proleptic Gregorian Calendar)
+// This calculation allows for conversion between the internal representation
+// and the UTC representation.
+const long long unixToInternal =
+ static_cast<long long>(1969 * 365 + 1969 / 4 - 1969 / 100 + 1969 / 400) * 86400;
+} // namespace
+
+namespace mongo {
+
+namespace {
+
+// Packet struct
+struct TrafficReaderPacket {
+ uint64_t id;
+ StringData local;
+ StringData remote;
+ Date_t date;
+ uint64_t order;
+ MsgData::ConstView message;
+};
+
+bool readBytes(size_t toRead, char* buf, int fd) {
+ while (toRead) {
+#ifdef _WIN32
+ auto r = _read(fd, buf, toRead);
+#else
+ auto r = ::read(fd, buf, toRead);
+#endif
+
+ if (r == -1) {
+ auto pair = errnoAndDescription();
+
+ uassert(ErrorCodes::FileStreamFailed,
+ str::stream() << "failed to read bytes: errno(" << pair.first << ") : "
+ << pair.second,
+ pair.first == EINTR);
+
+ continue;
+ } else if (r == 0) {
+ return false;
+ }
+
+ buf += r;
+ toRead -= r;
+ }
+
+ return true;
+}
+
+boost::optional<TrafficReaderPacket> readPacket(char* buf, int fd) {
+ if (!readBytes(4, buf, fd)) {
+ return boost::none;
+ }
+ auto len = ConstDataView(buf).read<LittleEndian<uint32_t>>();
+
+ uassert(ErrorCodes::FailedToParse, "packet too large", len < MaxMessageSizeBytes);
+ uassert(
+ ErrorCodes::FailedToParse, "could not read full packet", readBytes(len - 4, buf + 4, fd));
+
+ ConstDataRangeCursor cdr(buf, buf + len);
+
+ // Read the packet
+ uassertStatusOK(cdr.skip<LittleEndian<uint32_t>>());
+ uint64_t id = uassertStatusOK(cdr.readAndAdvance<LittleEndian<uint64_t>>());
+ StringData local = uassertStatusOK(cdr.readAndAdvance<Terminated<'\0', StringData>>());
+ StringData remote = uassertStatusOK(cdr.readAndAdvance<Terminated<'\0', StringData>>());
+ uint64_t date = uassertStatusOK(cdr.readAndAdvance<LittleEndian<uint64_t>>());
+ uint64_t order = uassertStatusOK(cdr.readAndAdvance<LittleEndian<uint64_t>>());
+ MsgData::ConstView message(cdr.data());
+
+ return TrafficReaderPacket{
+ id, local, remote, Date_t::fromMillisSinceEpoch(date), order, message};
+}
+
+void getBSONObjFromPacket(TrafficReaderPacket& packet, BSONObjBuilder* builder) {
+ {
+ // RawOp Field
+ BSONObjBuilder rawop(builder->subobjStart("rawop"));
+
+ // Add the header fields to rawOp
+ {
+ BSONObjBuilder header(rawop.subobjStart("header"));
+ header.append("messagelength", static_cast<int32_t>(packet.message.getLen()));
+ header.append("requestid", static_cast<int32_t>(packet.message.getId()));
+ header.append("responseto", static_cast<int32_t>(packet.message.getResponseToMsgId()));
+ header.append("opcode", static_cast<int32_t>(packet.message.getNetworkOp()));
+ }
+
+ // Add the binary reprentation of the entire message for rawop.body
+ // auto buf = SharedBuffer::allocate(packet.message.getLen());
+ // std::memcpy(buf.get(), packet.message.view2ptr(), packet.message.getLen());
+ // rawop.appendBinData("body", packet.message.getLen(), BinDataGeneral, buf.get());
+ rawop.appendBinData(
+ "body", packet.message.getLen(), BinDataGeneral, packet.message.view2ptr());
+ }
+
+ // The seen field represents the time that the operation took place
+ // Trying to re-create the way mongoreplay does this
+ {
+ BSONObjBuilder seen(builder->subobjStart("seen"));
+ seen.append(
+ "sec",
+ static_cast<int64_t>((packet.date.toMillisSinceEpoch() / 1000) + unixToInternal));
+ seen.append("nsec", static_cast<int32_t>(packet.order));
+ }
+
+ // Figure out which is the src endpoint as opposed to the dest endpoint
+ auto localInd = packet.local.rfind(':');
+ auto remoteInd = packet.remote.rfind(':');
+ if (localInd != std::string::npos && remoteInd != std::string::npos) {
+ auto local = packet.local.substr(localInd + 1);
+ auto remote = packet.remote.substr(remoteInd + 1);
+ if (packet.message.getResponseToMsgId()) {
+ builder->append("srcendpoint", local);
+ builder->append("destendpoint", remote);
+ } else {
+ builder->append("srcendpoint", remote);
+ builder->append("destendpoint", local);
+ }
+ }
+
+ // Fill out the remaining fields
+ builder->append("order", static_cast<int64_t>(packet.order));
+ builder->append("seenconnectionnum", static_cast<int64_t>(packet.id));
+ builder->append("playedconnectionnum", static_cast<int64_t>(0));
+ builder->append("generation", static_cast<int32_t>(0));
+}
+
+void addOpType(TrafficReaderPacket& packet, BSONObjBuilder* builder) {
+ if (packet.message.getNetworkOp() == dbMsg) {
+ Message message;
+ message.setData(dbMsg, packet.message.data(), packet.message.dataLen());
+
+ auto opMsg = rpc::opMsgRequestFromAnyProtocol(message);
+ builder->append("opType", opMsg.getCommandName());
+ } else {
+ builder->append("opType", "legacy");
+ }
+}
+
+} // namespace
+
+BSONArray trafficRecordingFileToBSONArr(const std::string& inputFile) {
+ BSONArrayBuilder builder{};
+
+// Open the connection to the input file
+#ifdef _WIN32
+ auto inputFd = ::open(inputFile.c_str(), O_RDONLY | O_BINARY);
+#else
+ auto inputFd = ::open(inputFile.c_str(), O_RDONLY);
+#endif
+
+ const auto guard = makeGuard([&] { ::close(inputFd); });
+
+ uassert(ErrorCodes::FileNotOpen,
+ str::stream() << "Specified file does not exist (" << inputFile << ")",
+ inputFd > 0);
+
+ auto buf = SharedBuffer::allocate(MaxMessageSizeBytes);
+ while (auto packet = readPacket(buf.get(), inputFd)) {
+ BSONObjBuilder bob(builder.subobjStart());
+ getBSONObjFromPacket(*packet, &bob);
+ addOpType(*packet, &bob);
+ }
+
+ return builder.arr();
+}
+
+void trafficRecordingFileToMongoReplayFile(int inputFd, std::ostream& outputStream) {
+ // Document expected by mongoreplay
+ BSONObjBuilder opts{};
+ opts.append("playbackfileversion", 1);
+ opts.append("driveropsfiltered", false);
+ auto optsObj = opts.obj();
+ outputStream.write(optsObj.objdata(), optsObj.objsize());
+
+ BSONObjBuilder bob;
+ auto buf = SharedBuffer::allocate(MaxMessageSizeBytes);
+
+ while (auto packet = readPacket(buf.get(), inputFd)) {
+ getBSONObjFromPacket(*packet, &bob);
+
+ auto obj = bob.asTempObj();
+ outputStream.write(obj.objdata(), obj.objsize());
+
+ bob.resetToEmpty();
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/traffic_reader.h b/src/mongo/db/traffic_reader.h
new file mode 100644
index 00000000000..c22464fa92a
--- /dev/null
+++ b/src/mongo/db/traffic_reader.h
@@ -0,0 +1,42 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/rpc/op_msg.h"
+
+#pragma once
+
+namespace mongo {
+
+// Method for testing, takes the recorded traffic and returns a BSONArray
+BSONArray trafficRecordingFileToBSONArr(const std::string& inputFile);
+
+// This is the function that traffic_reader_main.cpp calls
+void trafficRecordingFileToMongoReplayFile(int inFile, std::ostream& outFile);
+} // namespace mongo
diff --git a/src/mongo/db/traffic_reader_main.cpp b/src/mongo/db/traffic_reader_main.cpp
new file mode 100644
index 00000000000..c907b74c979
--- /dev/null
+++ b/src/mongo/db/traffic_reader_main.cpp
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <fcntl.h>
+#include <fstream>
+#include <iostream>
+#include <string>
+
+#ifdef _WIN32
+#include <io.h>
+#endif
+
+#include "mongo/base/initializer.h"
+#include "mongo/db/traffic_reader.h"
+#include "mongo/util/signal_handlers.h"
+#include "mongo/util/text.h"
+
+#include <boost/filesystem.hpp>
+#include <boost/program_options.hpp>
+
+using namespace mongo;
+
+int main(int argc, char* argv[], char** envp) {
+
+ setupSignalHandlers();
+
+ Status status = mongo::runGlobalInitializers(argc, argv, envp);
+ if (!status.isOK()) {
+ std::cerr << "Failed global initialization: " << status << std::endl;
+ return EXIT_FAILURE;
+ }
+
+ startSignalProcessingThread();
+
+ // Handle program options
+ boost::program_options::variables_map vm;
+
+ // input / output files for the reader (input defaults to stdin)
+ int inputFd = 0;
+ std::ofstream outputStream;
+
+ try {
+ // Define the program options
+ auto inputStr = "Path to file input file (defaults to stdin)";
+ auto outputStr =
+ "Path to file that mongotrafficreader will place its output (defaults to stdout)";
+ boost::program_options::options_description desc{"Options"};
+ desc.add_options()("help,h", "help")(
+ "input,i", boost::program_options::value<std::string>(), inputStr)(
+ "output,o", boost::program_options::value<std::string>(), outputStr);
+
+ // Parse the program options
+ store(parse_command_line(argc, argv, desc), vm);
+ notify(vm);
+
+ // Handle the help option
+ if (vm.count("help")) {
+ std::cout << "Mongo Traffic Reader Help: \n\n\t./mongotrafficreader "
+ "-i trafficinput.txt -o mongotrafficreader_dump.bson \n\n"
+ << desc << std::endl;
+ return EXIT_SUCCESS;
+ }
+
+ // User can specify a --input param and it must point to a valid file
+ if (vm.count("input")) {
+ auto inputFile = vm["input"].as<std::string>();
+ if (!boost::filesystem::exists(inputFile.c_str())) {
+ std::cout << "Error: Specified file does not exist (" << inputFile.c_str() << ")"
+ << std::endl;
+ return EXIT_FAILURE;
+ }
+
+// Open the connection to the input file
+#ifdef _WIN32
+ inputFd = open(inputFile.c_str(), O_RDONLY | O_BINARY);
+#else
+ inputFd = open(inputFile.c_str(), O_RDONLY);
+#endif
+ }
+
+ // User must specify a --output param and it does not need to point to a valid file
+ if (vm.count("output")) {
+ auto outputFile = vm["output"].as<std::string>();
+
+ // Open the connection to the output file
+ outputStream.open(outputFile, std::ios::out | std::ios::trunc | std::ios::binary);
+ if (!outputStream.is_open()) {
+ std::cerr << "Error writing to file: " << outputFile << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else {
+ // output to std::cout
+ outputStream.copyfmt(std::cout);
+ outputStream.clear(std::cout.rdstate());
+ outputStream.basic_ios<char>::rdbuf(std::cout.rdbuf());
+ }
+ } catch (const boost::program_options::error& ex) {
+ std::cerr << ex.what() << '\n';
+ return EXIT_FAILURE;
+ }
+
+ mongo::trafficRecordingFileToMongoReplayFile(inputFd, outputStream);
+
+ return 0;
+}
diff --git a/src/mongo/db/traffic_recorder.cpp b/src/mongo/db/traffic_recorder.cpp
new file mode 100644
index 00000000000..6ba88708b78
--- /dev/null
+++ b/src/mongo/db/traffic_recorder.cpp
@@ -0,0 +1,416 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/traffic_recorder.h"
+
+#include <boost/filesystem/operations.hpp>
+#include <fstream>
+
+#include "mongo/base/data_builder.h"
+#include "mongo/base/data_type_terminated.h"
+#include "mongo/base/init.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/db/commands/test_commands_enabled.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/db/service_context.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/producer_consumer_queue.h"
+
+namespace mongo {
+
+namespace {
+
+constexpr auto kDefaultTrafficRecordingDirectory = ""_sd;
+
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(trafficRecordingDirectory,
+ std::string,
+ kDefaultTrafficRecordingDirectory.toString())
+ ->withValidator([](const std::string& newValue) {
+ if (!boost::filesystem::is_directory(newValue)) {
+ return Status(ErrorCodes::FileNotOpen,
+ str::stream() << "traffic recording directory \"" << newValue
+ << "\" is not a directory.");
+ }
+
+ return Status::OK();
+ });
+
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(AlwaysRecordTraffic, std::string, "");
+
+bool shouldAlwaysRecordTraffic = false;
+
+MONGO_INITIALIZER(ShouldAlwaysRecordTraffic)(InitializerContext*) {
+ if (!AlwaysRecordTraffic.size()) {
+ return Status::OK();
+ }
+
+ if (!getTestCommandsEnabled()) {
+ return Status(ErrorCodes::BadValue,
+ "invalid to set AlwaysRecordTraffic if test commands are not enabled");
+ }
+
+ if (trafficRecordingDirectory.empty()) {
+ if (serverGlobalParams.logpath.empty()) {
+ return Status(ErrorCodes::BadValue,
+ "invalid to set AlwaysRecordTraffic without a logpath or "
+ "trafficRecordingDirectory");
+ } else {
+ trafficRecordingDirectory = serverGlobalParams.logpath;
+ }
+ }
+
+ shouldAlwaysRecordTraffic = true;
+
+ return Status::OK();
+}
+
+} // namespace
+
+/**
+ * The Recording class represents a single recording that the recorder is exposing. It's made up of
+ * a background thread which flushes records to disk, and helper methods to push to that thread,
+ * expose stats, and stop the recording.
+ */
+class TrafficRecorder::Recording {
+public:
+ Recording(const StartRecordingTraffic& options)
+ : _path(_getPath(options.getFilename().toString())), _maxLogSize(options.getMaxFileSize()) {
+
+ MultiProducerSingleConsumerQueue<TrafficRecordingPacket, CostFunction>::Options
+ queueOptions;
+ queueOptions.maxQueueDepth = options.getBufferSize();
+ if (!shouldAlwaysRecordTraffic) {
+ queueOptions.maxProducerQueueDepth = 0;
+ }
+ _pcqPipe = MultiProducerSingleConsumerQueue<TrafficRecordingPacket, CostFunction>::Pipe(
+ queueOptions);
+
+ _trafficStats.setRunning(true);
+ _trafficStats.setBufferSize(options.getBufferSize());
+ _trafficStats.setRecordingFile(_path);
+ _trafficStats.setMaxFileSize(_maxLogSize);
+ }
+
+ void run() {
+ _thread = stdx::thread([ consumer = std::move(_pcqPipe.consumer), this ] {
+ try {
+ DataBuilder db;
+ std::fstream out(_path,
+ std::ios_base::binary | std::ios_base::trunc | std::ios_base::out);
+
+ while (true) {
+ std::deque<TrafficRecordingPacket> storage;
+ size_t bytes;
+
+ std::tie(storage, bytes) = consumer.popManyUpTo(MaxMessageSizeBytes);
+
+ // if this fired... somehow we got a message bigger than a message
+ invariant(bytes);
+
+ for (const auto& packet : storage) {
+ db.clear();
+ Message toWrite = packet.message;
+
+ uassertStatusOK(db.writeAndAdvance<LittleEndian<uint32_t>>(0));
+ uassertStatusOK(db.writeAndAdvance<LittleEndian<uint64_t>>(packet.id));
+ uassertStatusOK(db.writeAndAdvance<Terminated<'\0', StringData>>(
+ StringData(packet.local)));
+ uassertStatusOK(db.writeAndAdvance<Terminated<'\0', StringData>>(
+ StringData(packet.remote)));
+ uassertStatusOK(db.writeAndAdvance<LittleEndian<uint64_t>>(
+ packet.now.toMillisSinceEpoch()));
+ uassertStatusOK(db.writeAndAdvance<LittleEndian<uint64_t>>(packet.order));
+
+ auto size = db.size() + toWrite.size();
+ uassertStatusOK(db.getCursor().write<LittleEndian<uint32_t>>(size));
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _written += size;
+ }
+
+ uassert(ErrorCodes::LogWriteFailed,
+ "hit maximum log size",
+ _written < _maxLogSize);
+
+ out.write(db.getCursor().data(), db.size());
+ out.write(toWrite.buf(), toWrite.size());
+ }
+ }
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueConsumed>&) {
+ // Close naturally
+ } catch (...) {
+ auto status = exceptionToStatus();
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _result = status;
+ }
+ });
+ }
+
+ /**
+ * pushRecord returns false if the queue was full. This is ultimately fatal to the recording
+ */
+ bool pushRecord(const transport::SessionHandle& ts,
+ Date_t now,
+ const uint64_t order,
+ const Message& message) {
+ try {
+ _pcqPipe.producer.push(
+ {ts->id(), ts->local().toString(), ts->remote().toString(), now, order, message});
+ return true;
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueProducerQueueDepthExceeded>&) {
+ invariant(!shouldAlwaysRecordTraffic);
+
+ // If we couldn't push our packet begin the process of failing the recording
+ _pcqPipe.producer.close();
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // If the result was otherwise okay, mark it as failed due to the queue blocking. If
+ // it failed for another reason, don't overwrite that.
+ if (_result.isOK()) {
+ _result = Status(ErrorCodes::Error(51061), "queue was blocked in traffic recorder");
+ }
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) {
+ }
+
+ return false;
+ }
+
+ Status shutdown() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ if (!_inShutdown) {
+ _inShutdown = true;
+ lk.unlock();
+
+ _pcqPipe.producer.close();
+ _thread.join();
+
+ lk.lock();
+ }
+
+ return _result;
+ }
+
+ BSONObj getStats() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _trafficStats.setBufferedBytes(_pcqPipe.controller.getStats().queueDepth);
+ _trafficStats.setCurrentFileSize(_written);
+ return _trafficStats.toBSON();
+ }
+
+ AtomicWord<uint64_t> order{0};
+
+private:
+ struct TrafficRecordingPacket {
+ const uint64_t id;
+ const std::string local;
+ const std::string remote;
+ const Date_t now;
+ const uint64_t order;
+ const Message message;
+ };
+
+ struct CostFunction {
+ size_t operator()(const TrafficRecordingPacket& packet) const {
+ return packet.message.size();
+ }
+ };
+
+ static std::string _getPath(const std::string& filename) {
+ uassert(ErrorCodes::BadValue,
+ "Traffic recording filename must not be empty",
+ !filename.empty());
+
+ if (trafficRecordingDirectory.back() == '/') {
+ trafficRecordingDirectory.pop_back();
+ }
+ auto parentPath = boost::filesystem::path(trafficRecordingDirectory);
+ auto path = parentPath / filename;
+
+ uassert(ErrorCodes::BadValue,
+ "Traffic recording filename must be a simple filename",
+ path.parent_path() == parentPath);
+
+ return path.string();
+ }
+
+ const std::string _path;
+ const size_t _maxLogSize;
+
+ MultiProducerSingleConsumerQueue<TrafficRecordingPacket, CostFunction>::Pipe _pcqPipe;
+ stdx::thread _thread;
+
+ stdx::mutex _mutex;
+ bool _inShutdown = false;
+ TrafficRecorderStats _trafficStats;
+ size_t _written = 0;
+ Status _result = Status::OK();
+};
+
+namespace {
+static const auto getTrafficRecorder = ServiceContext::declareDecoration<TrafficRecorder>();
+} // namespace
+
+TrafficRecorder& TrafficRecorder::get(ServiceContext* svc) {
+ return getTrafficRecorder(svc);
+}
+
+TrafficRecorder::TrafficRecorder() : _shouldRecord(shouldAlwaysRecordTraffic) {}
+
+TrafficRecorder::~TrafficRecorder() {
+ if (shouldAlwaysRecordTraffic) {
+ _recording->shutdown().ignore();
+ }
+}
+
+void TrafficRecorder::start(const StartRecordingTraffic& options) {
+ invariant(!shouldAlwaysRecordTraffic);
+
+ uassert(ErrorCodes::BadValue,
+ "Traffic recording directory not set",
+ !trafficRecordingDirectory.empty());
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ uassert(ErrorCodes::BadValue, "Traffic recording already active", !_recording);
+
+ _recording = std::make_shared<Recording>(options);
+ _recording->run();
+ }
+
+ _shouldRecord.store(true);
+}
+
+void TrafficRecorder::stop() {
+ invariant(!shouldAlwaysRecordTraffic);
+
+ _shouldRecord.store(false);
+
+ auto recording = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ uassert(ErrorCodes::BadValue, "Traffic recording not active", _recording);
+
+ return std::move(_recording);
+ }();
+
+ uassertStatusOK(recording->shutdown());
+}
+
+void TrafficRecorder::observe(const transport::SessionHandle& ts,
+ Date_t now,
+ const Message& message) {
+ if (shouldAlwaysRecordTraffic) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (!_recording) {
+ StartRecordingTraffic options;
+ options.setFilename(AlwaysRecordTraffic);
+ options.setMaxFileSize(std::numeric_limits<int64_t>::max());
+
+ _recording = std::make_shared<Recording>(options);
+ _recording->run();
+ }
+ }
+
+ invariant(_recording->pushRecord(ts, now, _recording->order.addAndFetch(1), message));
+ return;
+ }
+
+ if (!_shouldRecord.load()) {
+ return;
+ }
+
+ auto recording = _getCurrentRecording();
+
+ // If we don't have an active recording, bail
+ if (!recording) {
+ return;
+ }
+
+ // Try to record the message
+ if (recording->pushRecord(ts, now, recording->order.addAndFetch(1), message)) {
+ return;
+ }
+
+ // We couldn't queue
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // If the recording isn't the one we have in hand bail (its been ended, or a new one has
+ // been created
+ if (_recording != recording) {
+ return;
+ }
+
+ // We couldn't queue and it's still our recording. No one else should try to queue
+ _shouldRecord.store(false);
+}
+
+std::shared_ptr<TrafficRecorder::Recording> TrafficRecorder::_getCurrentRecording() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _recording;
+}
+
+class TrafficRecorder::TrafficRecorderSSS : public ServerStatusSection {
+public:
+ TrafficRecorderSSS() : ServerStatusSection("trafficRecording") {}
+
+ bool includeByDefault() const override {
+ return true;
+ }
+
+ BSONObj generateSection(OperationContext* opCtx,
+ const BSONElement& configElement) const override {
+ auto& recorder = TrafficRecorder::get(opCtx->getServiceContext());
+
+ if (!recorder._shouldRecord.load()) {
+ return BSON("running" << false);
+ }
+
+ auto recording = recorder._getCurrentRecording();
+
+ if (!recording) {
+ return BSON("running" << false);
+ }
+
+ return recording->getStats();
+ }
+} trafficRecorderStats;
+
+} // namespace mongo
diff --git a/src/mongo/db/traffic_recorder.h b/src/mongo/db/traffic_recorder.h
new file mode 100644
index 00000000000..8bd261cbfb4
--- /dev/null
+++ b/src/mongo/db/traffic_recorder.h
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/db/service_context.h"
+#include "mongo/db/traffic_recorder_gen.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/rpc/message.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/transport/session.h"
+
+namespace mongo {
+
+/**
+ * A service context level global which captures packet capture through the transport layer if it is
+ * enabled. The service is intended to be turned on and off via startRecordingTrafficTraffic and
+ * stopRecordingTrafficTraffic.
+ *
+ * The recording can have one recording running at a time and the intention is that observe() blocks
+ * callers for the least amount of time possible.
+ */
+class TrafficRecorder {
+public:
+ static TrafficRecorder& get(ServiceContext* svc);
+
+ TrafficRecorder();
+ ~TrafficRecorder();
+
+ // Start and stop block until the associate operation has succeeded or failed
+ //
+ // On failure these methods throw
+ void start(const StartRecordingTraffic& options);
+ void stop();
+
+ void observe(const transport::SessionHandle& ts, Date_t now, const Message& message);
+
+private:
+ class TrafficRecorderSSS;
+ class Recording;
+
+ std::shared_ptr<Recording> _getCurrentRecording() const;
+
+ AtomicWord<bool> _shouldRecord;
+
+ // The mutex only protects the last recording shared_ptr
+ mutable stdx::mutex _mutex;
+ std::shared_ptr<Recording> _recording;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/traffic_recorder.idl b/src/mongo/db/traffic_recorder.idl
new file mode 100644
index 00000000000..fa9cd232c14
--- /dev/null
+++ b/src/mongo/db/traffic_recorder.idl
@@ -0,0 +1,77 @@
+# Copyright (C) 2018-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# This IDL file describes the BSON format for a LogicalSessionId, and
+# handles the serialization to and deserialization from its BSON representation
+# for that class.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+
+structs:
+
+ TrafficRecorderStats:
+ description: "A struct representing the trafficRecording server status section"
+ strict: true
+ fields:
+ running:
+ type: bool
+ bufferSize:
+ type: long
+ bufferedBytes:
+ type: long
+ recordingFile:
+ type: string
+ maxFileSize:
+ type: long
+ currentFileSize:
+ type: long
+
+commands:
+ startRecordingTraffic:
+ description: "start recording Command"
+ namespace: ignored
+ fields:
+ filename:
+ description: "output file name"
+ type: string
+ bufferSize:
+ description: "size of buffer"
+ default: 134217728
+ type: long
+ maxFileSize:
+ description: "size of log file"
+ default: 6294967296
+ type: long
+
+ stopRecordingTraffic:
+ description: "stop recording Command"
+ namespace: ignored
diff --git a/src/mongo/shell/shell_utils_launcher.cpp b/src/mongo/shell/shell_utils_launcher.cpp
index 58b33fe83aa..5db817fff68 100644
--- a/src/mongo/shell/shell_utils_launcher.cpp
+++ b/src/mongo/shell/shell_utils_launcher.cpp
@@ -61,6 +61,7 @@
#endif
#include "mongo/client/dbclient_connection.h"
+#include "mongo/db/traffic_reader.h"
#include "mongo/scripting/engine.h"
#include "mongo/shell/shell_options.h"
#include "mongo/shell/shell_utils.h"
@@ -1062,6 +1063,14 @@ BSONObj StopMongoProgramByPid(const BSONObj& a, void* data) {
return BSON("" << (double)code);
}
+BSONObj ConvertTrafficRecordingToBSON(const BSONObj& a, void* data) {
+ int nFields = a.nFields();
+ uassert(ErrorCodes::FailedToParse, "wrong number of arguments", nFields == 1);
+
+ auto arr = trafficRecordingFileToBSONArr(a.firstElement().String());
+ return BSON("" << arr);
+}
+
int KillMongoProgramInstances() {
vector<ProcessId> pids;
registry.getRegisteredPids(pids);
@@ -1113,6 +1122,7 @@ void installShellUtilsLauncher(Scope& scope) {
scope.injectNative("resetDbpath", ResetDbpath);
scope.injectNative("pathExists", PathExists);
scope.injectNative("copyDbpath", CopyDbpath);
+ scope.injectNative("convertTrafficRecordingToBSON", ConvertTrafficRecordingToBSON);
}
} // namespace shell_utils
} // namespace mongo
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 6ca36ed7916..43032ccb1b7 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -174,6 +174,7 @@ env.Library(
'transport_layer_common',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/traffic_recorder',
'$BUILD_DIR/mongo/transport/message_compressor',
],
)
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 6ecf18384d9..09d3a8b00f0 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/traffic_recorder.h"
#include "mongo/rpc/message.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/stdx/memory.h"
@@ -423,6 +424,9 @@ void ServiceStateMachine::_sinkCallback(Status status) {
void ServiceStateMachine::_processMessage(ThreadGuard guard) {
invariant(!_inMessage.empty());
+ TrafficRecorder::get(_serviceContext)
+ .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage);
+
auto& compressorMgr = MessageCompressorManager::forSession(_session());
_compressorId = boost::none;
@@ -472,6 +476,10 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
uassertStatusOK(swm.getStatus());
toSink = swm.getValue();
}
+
+ TrafficRecorder::get(_serviceContext)
+ .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
+
_sinkMessage(std::move(guard), std::move(toSink));
} else {