diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-26 15:06:23 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-27 13:19:24 -0400 |
commit | eec194e751f6c36e53c241da3a1b42fd4b9b98a1 (patch) | |
tree | 6ba88aac4aba6ba218d1e92be5e84aa514fb1a69 | |
parent | 2df6ea3c5b70acc3927a08690f8d42b9d35b8709 (diff) | |
download | mongo-eec194e751f6c36e53c241da3a1b42fd4b9b98a1.tar.gz |
SERVER-28983 Fix undefined behaviour for the 'aggregate' command
-rw-r--r-- | jstests/sharding/remove2.js | 360 | ||||
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 24 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_pipeline_cmd.cpp | 8 |
5 files changed, 213 insertions, 214 deletions
diff --git a/jstests/sharding/remove2.js b/jstests/sharding/remove2.js index c74daa28c59..e862167fb12 100644 --- a/jstests/sharding/remove2.js +++ b/jstests/sharding/remove2.js @@ -2,188 +2,196 @@ load("jstests/replsets/rslib.js"); -seedString = function(replTest) { - members = replTest.getReplSetConfig().members.map(function(elem) { - return elem.host; - }); - return replTest.name + '/' + members.join(','); -}; - -removeShard = function(st, replTest) { - jsTest.log("Removing shard with name: " + replTest.name); - var res = st.s.adminCommand({removeShard: replTest.name}); - assert.commandWorked(res); - assert.eq('started', res.state); - assert.soon(function() { - res = st.s.adminCommand({removeShard: replTest.name}); +(function() { + 'use strict'; + + function seedString(replTest) { + var members = replTest.getReplSetConfig().members.map(function(elem) { + return elem.host; + }); + return replTest.name + '/' + members.join(','); + }; + + function removeShard(st, replTest) { + jsTest.log("Removing shard with name: " + replTest.name); + var res = st.s.adminCommand({removeShard: replTest.name}); assert.commandWorked(res); - return ('completed' === res.state); - }, "failed to remove shard: " + tojson(res)); - - // Drop the database so the shard can be re-added. - assert.commandWorked(replTest.getPrimary().getDB(coll.getDB().getName()).dropDatabase()); -}; - -addShard = function(st, replTest) { - seed = seedString(replTest); - print("Adding shard with seed: " + seed); - try { - assert.eq(true, st.adminCommand({addshard: seed})); - } catch (e) { - print("First attempt to addShard failed, trying again"); - // transport error on first attempt is expected. Make sure second attempt goes through - assert.eq(true, st.adminCommand({addshard: seed})); + assert.eq('started', res.state); + assert.soon(function() { + res = st.s.adminCommand({removeShard: replTest.name}); + assert.commandWorked(res); + return ('completed' === res.state); + }, "failed to remove shard: " + tojson(res)); + + // Drop the database so the shard can be re-added. + assert.commandWorked(replTest.getPrimary().getDB(coll.getDB().getName()).dropDatabase()); + }; + + function addShard(st, replTest) { + var seed = seedString(replTest); + print("Adding shard with seed: " + seed); + try { + assert.eq(true, st.adminCommand({addshard: seed})); + } catch (e) { + print("First attempt to addShard failed, trying again"); + // transport error on first attempt is expected. Make sure second attempt goes through + assert.eq(true, st.adminCommand({addshard: seed})); + } + awaitRSClientHosts( + new Mongo(st.s.host), replTest.getSecondaries(), {ok: true, secondary: true}); + + assert.soon(function() { + var x = st.chunkDiff(coll.getName(), coll.getDB().getName()); + print("chunk diff: " + x); + return x < 2; + }, "no balance happened", 30 * 60 * 1000); + + try { + assert.eq(300, coll.find().itcount()); + } catch (e) { + // Expected. First query might get transport error and need to reconnect. + printjson(e); + assert.eq(300, coll.find().itcount()); + } + print("Shard added successfully"); + }; + + var st = new ShardingTest( + {shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}, other: {chunkSize: 1, enableBalancer: true}}); + + // Pending resolution of SERVER-8598, we need to wait for deletion after chunk migrations to + // avoid a pending delete re-creating a database after it was dropped. + st.s.getDB("config").settings.update({_id: "balancer"}, {$set: {_waitForDelete: true}}, true); + + var conn = new Mongo(st.s.host); + var coll = conn.getCollection("test.remove2"); + coll.drop(); + + assert.commandWorked(st.s0.adminCommand({enableSharding: coll.getDB().getName()})); + st.ensurePrimaryShard(coll.getDB().getName(), st.shard0.shardName); + assert.commandWorked(st.s0.adminCommand({shardCollection: coll.getFullName(), key: {i: 1}})); + + // Setup initial data + var str = 'a'; + while (str.length < 1024 * 16) { + str += str; + } + + var bulk = coll.initializeUnorderedBulkOp(); + for (var i = 0; i < 300; i++) { + bulk.insert({i: i % 10, str: str}); } - awaitRSClientHosts( - new Mongo(st.s.host), replTest.getSecondaries(), {ok: true, secondary: true}); + assert.writeOK(bulk.execute()); + + assert.eq(300, coll.find().itcount()); assert.soon(function() { - var x = st.chunkDiff(coll.getName(), coll.getDB().getName()); + var x = st.chunkDiff('remove2', "test"); print("chunk diff: " + x); return x < 2; }, "no balance happened", 30 * 60 * 1000); - try { - assert.eq(300, coll.find().itcount()); - } catch (e) { - // Expected. First query might get transport error and need to reconnect. - printjson(e); - assert.eq(300, coll.find().itcount()); - } - print("Shard added successfully"); -}; - -var st = new ShardingTest( - {shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}, other: {chunkSize: 1, enableBalancer: true}}); - -// Pending resolution of SERVER-8598, we need to wait for deletion after chunk migrations to avoid -// a pending delete re-creating a database after it was dropped. -st.s.getDB("config").settings.update({_id: "balancer"}, {$set: {_waitForDelete: true}}, true); - -var rst0 = st._rs[0].test; -var rst1 = st._rs[1].test; - -var conn = new Mongo(st.s.host); -var coll = conn.getCollection("test.remove2"); -coll.drop(); - -st.admin.runCommand({enableSharding: coll.getDB().getName()}); -st.ensurePrimaryShard(coll.getDB().getName(), st.shard0.shardName); -st.admin.runCommand({shardCollection: coll.getFullName(), key: {i: 1}}); - -// Setup initial data -var str = 'a'; -while (str.length < 1024 * 16) { - str += str; -} - -var bulk = coll.initializeUnorderedBulkOp(); -for (var i = 0; i < 300; i++) { - bulk.insert({i: i % 10, str: str}); -} -assert.writeOK(bulk.execute()); - -assert.eq(300, coll.find().itcount()); - -assert.soon(function() { - var x = st.chunkDiff('remove2', "test"); - print("chunk diff: " + x); - return x < 2; -}, "no balance happened", 30 * 60 * 1000); - -assert.eq(300, coll.find().itcount()); - -st.printShardingStatus(); - -// Remove shard and add it back in, without shutting it down. -jsTestLog("Attempting to remove shard and add it back in"); -removeShard(st, rst1); -addShard(st, rst1); - -// Remove shard, restart set, then add it back in. -jsTestLog("Attempting to remove shard, restart the set, and then add it back in"); -originalSeed = seedString(rst1); - -removeShard(st, rst1); -rst1.stopSet(); -print("Sleeping for 20 seconds to let the other shard's ReplicaSetMonitor time out"); -sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe - -rst1.startSet({restart: true}); -rst1.initiate(); -rst1.awaitReplication(); - -assert.eq(originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before"); -addShard(st, rst1); - -// Shut down shard and wait for its ReplicaSetMonitor to be cleaned up, then start it back up and -// use it. -// TODO: test this both with AND without waiting for the ReplicaSetMonitor to be cleaned up. -// This part doesn't pass, even without cleaning up the ReplicaSetMonitor - see SERVER-5900. -/*printjson( conn.getDB('admin').runCommand({movePrimary : 'test2', to : rst1.name}) ); -printjson( conn.getDB('admin').runCommand({setParameter : 1, replMonitorMaxFailedChecks : 5}) ); -jsTestLog( "Shutting down set" ) -rst1.stopSet(); -jsTestLog( "sleeping for 20 seconds to make sure ReplicaSetMonitor gets cleaned up"); -sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe - -// Should fail since rst1 is the primary for test2 -assert.throws(function() {conn.getDB('test2').foo.find().itcount()}); -jsTestLog( "Bringing set back up" ); -rst1.startSet(); -rst1.initiate(); -rst1.awaitReplication(); - -jsTestLog( "Checking that set is usable again" ); -//conn.getDB('admin').runCommand({flushRouterConfig:1}); // Uncommenting this makes test pass -conn.getDB('test2').foo.insert({a:1}); -gle = conn.getDB('test2').runCommand('getLastError'); -if ( !gle.ok ) { - // Expected. First write will fail and need to re-connect - print( "write failed" ); - printjson( gle ); + assert.eq(300, coll.find().itcount()); + + st.printShardingStatus(); + + var rst1 = st.rs1; + // Remove shard and add it back in, without shutting it down. + jsTestLog("Attempting to remove shard and add it back in"); + removeShard(st, rst1); + addShard(st, rst1); + + // Remove shard, restart set, then add it back in. + jsTestLog("Attempting to remove shard, restart the set, and then add it back in"); + var originalSeed = seedString(rst1); + + removeShard(st, rst1); + rst1.stopSet(); + print("Sleeping for 20 seconds to let the other shard's ReplicaSetMonitor time out"); + sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe + + rst1.startSet({restart: true}); + rst1.initiate(); + rst1.awaitReplication(); + + assert.eq( + originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before"); + addShard(st, rst1); + + // Shut down shard and wait for its ReplicaSetMonitor to be cleaned up, then start it back up + // and use it. + // + // TODO: test this both with AND without waiting for the ReplicaSetMonitor to be cleaned up. + // + // This part doesn't pass, even without cleaning up the ReplicaSetMonitor - see SERVER-5900. + /* + printjson( conn.getDB('admin').runCommand({movePrimary : 'test2', to : rst1.name}) ); + printjson( conn.getDB('admin').runCommand({setParameter : 1, replMonitorMaxFailedChecks : 5}) ); + jsTestLog( "Shutting down set" ) + rst1.stopSet(); + jsTestLog( "sleeping for 20 seconds to make sure ReplicaSetMonitor gets cleaned up"); + sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe + + // Should fail since rst1 is the primary for test2 + assert.throws(function() {conn.getDB('test2').foo.find().itcount()}); + jsTestLog( "Bringing set back up" ); + rst1.startSet(); + rst1.initiate(); + rst1.awaitReplication(); + + jsTestLog( "Checking that set is usable again" ); + //conn.getDB('admin').runCommand({flushRouterConfig:1}); // Uncommenting this makes test pass conn.getDB('test2').foo.insert({a:1}); - assert( conn.getDB('test2').getLastErrorObj().ok ); -} - -assert.eq( 1, conn.getDB('test2').foo.find().itcount() ); -assert( conn.getDB('test2').dropDatabase().ok );*/ - -// Remove shard and add a new shard with the same replica set and shard name, but different ports. -jsTestLog("Attempt removing shard and adding a new shard with the same Replica Set name"); -removeShard(st, rst1); -rst1.stopSet(); -print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors"); -sleep(60000); - -var rst2 = new ReplSetTest({name: rst1.name, nodes: 2, useHostName: true}); -rst2.startSet({shardsvr: ""}); -rst2.initiate(); -rst2.awaitReplication(); - -addShard(st, rst2); -printjson(st.admin.runCommand({movePrimary: 'test2', to: rst2.name})); - -assert.eq(300, coll.find().itcount()); -conn.getDB('test2').foo.insert({a: 1}); -assert.eq(1, conn.getDB('test2').foo.find().itcount()); - -// Can't shut down with rst2 in the set or ShardingTest will fail trying to cleanup on shutdown. -// Have to take out rst2 and put rst1 back into the set so that it can clean up. -jsTestLog("Putting ShardingTest back to state it expects"); -printjson(st.admin.runCommand({movePrimary: 'test2', to: rst0.name})); -removeShard(st, rst2); -rst2.stopSet(); -print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors"); -sleep(60000); - -rst1.startSet({restart: true}); -rst1.initiate(); -rst1.awaitReplication(); - -assert.eq(originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before"); -addShard(st, rst1); - -jsTestLog("finishing!"); -// this should be fixed by SERVER-22176 -st.stop({allowedExitCodes: [MongoRunner.EXIT_ABRUPT]}); + gle = conn.getDB('test2').runCommand('getLastError'); + if ( !gle.ok ) { + // Expected. First write will fail and need to re-connect + print( "write failed" ); + printjson( gle ); + conn.getDB('test2').foo.insert({a:1}); + assert( conn.getDB('test2').getLastErrorObj().ok ); + } + + assert.eq( 1, conn.getDB('test2').foo.find().itcount() ); + assert( conn.getDB('test2').dropDatabase().ok ); + */ + + // Remove shard and add a new shard with the same replica set and shard name, but different + // ports + jsTestLog("Attempt removing shard and adding a new shard with the same Replica Set name"); + removeShard(st, rst1); + rst1.stopSet(); + print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors"); + sleep(60000); + + var rst2 = new ReplSetTest({name: rst1.name, nodes: 2, useHostName: true}); + rst2.startSet({shardsvr: ""}); + rst2.initiate(); + rst2.awaitReplication(); + + addShard(st, rst2); + printjson(st.admin.runCommand({movePrimary: 'test2', to: rst2.name})); + + assert.eq(300, coll.find().itcount()); + conn.getDB('test2').foo.insert({a: 1}); + assert.eq(1, conn.getDB('test2').foo.find().itcount()); + + // Can't shut down with rst2 in the set or ShardingTest will fail trying to cleanup on shutdown. + // Have to take out rst2 and put rst1 back into the set so that it can clean up. + jsTestLog("Putting ShardingTest back to state it expects"); + printjson(st.admin.runCommand({movePrimary: 'test2', to: st.rs0.name})); + removeShard(st, rst2); + rst2.stopSet(); + + print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors"); + sleep(60000); + + rst1.startSet({restart: true}); + rst1.initiate(); + rst1.awaitReplication(); + + assert.eq( + originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before"); + addShard(st, rst1); + + st.stop(); +})(); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index af863000b60..fcfb1156b7f 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -28,7 +28,6 @@ #include "mongo/platform/basic.h" -#include "mongo/base/init.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/client.h" @@ -50,8 +49,7 @@ bool isMergePipeline(const std::vector<BSONObj>& pipeline) { class PipelineCommand : public Command { public: - PipelineCommand() - : Command(AggregationRequest::kCommandName) {} // command is called "aggregate" + PipelineCommand() : Command("aggregate", false) {} // Locks are managed manually, in particular by DocumentSourceCursor. virtual bool supportsWriteConcern(const BSONObj& cmd) const override { @@ -152,13 +150,8 @@ public: return runAggregate(opCtx, nss, request.getValue(), cmdObj, *out); } -}; -MONGO_INITIALIZER(PipelineCommand)(InitializerContext* context) { - new PipelineCommand(); - - return Status::OK(); -} +} pipelineCmd; } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 163c4f288c1..f915ab47431 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -47,18 +47,18 @@ namespace mongo { -const StringData AggregationRequest::kCommandName = "aggregate"_sd; -const StringData AggregationRequest::kCursorName = "cursor"_sd; -const StringData AggregationRequest::kBatchSizeName = "batchSize"_sd; -const StringData AggregationRequest::kFromRouterName = "fromRouter"_sd; -const StringData AggregationRequest::kPipelineName = "pipeline"_sd; -const StringData AggregationRequest::kCollationName = "collation"_sd; -const StringData AggregationRequest::kExplainName = "explain"_sd; -const StringData AggregationRequest::kAllowDiskUseName = "allowDiskUse"_sd; -const StringData AggregationRequest::kHintName = "hint"_sd; -const StringData AggregationRequest::kCommentName = "comment"_sd; - -const long long AggregationRequest::kDefaultBatchSize = 101; +constexpr StringData AggregationRequest::kCommandName; +constexpr StringData AggregationRequest::kCursorName; +constexpr StringData AggregationRequest::kBatchSizeName; +constexpr StringData AggregationRequest::kFromRouterName; +constexpr StringData AggregationRequest::kPipelineName; +constexpr StringData AggregationRequest::kCollationName; +constexpr StringData AggregationRequest::kExplainName; +constexpr StringData AggregationRequest::kAllowDiskUseName; +constexpr StringData AggregationRequest::kHintName; +constexpr StringData AggregationRequest::kCommentName; + +constexpr long long AggregationRequest::kDefaultBatchSize; AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline) : _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {} diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 1f290f26a90..14b4dad0e59 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -47,18 +47,18 @@ class Document; */ class AggregationRequest { public: - static const StringData kCommandName; - static const StringData kCursorName; - static const StringData kBatchSizeName; - static const StringData kFromRouterName; - static const StringData kPipelineName; - static const StringData kCollationName; - static const StringData kExplainName; - static const StringData kAllowDiskUseName; - static const StringData kHintName; - static const StringData kCommentName; - - static const long long kDefaultBatchSize; + static constexpr StringData kCommandName = "aggregate"_sd; + static constexpr StringData kCursorName = "cursor"_sd; + static constexpr StringData kBatchSizeName = "batchSize"_sd; + static constexpr StringData kFromRouterName = "fromRouter"_sd; + static constexpr StringData kPipelineName = "pipeline"_sd; + static constexpr StringData kCollationName = "collation"_sd; + static constexpr StringData kExplainName = "explain"_sd; + static constexpr StringData kAllowDiskUseName = "allowDiskUse"_sd; + static constexpr StringData kHintName = "hint"_sd; + static constexpr StringData kCommentName = "comment"_sd; + + static constexpr long long kDefaultBatchSize = 101; /** * Create a new instance of AggregationRequest by parsing the raw command object. Returns a diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 4f4431b52ec..ffdf5f2ffde 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -41,12 +41,9 @@ namespace mongo { namespace { -/** - * Implements the aggregation (pipeline command for sharding). - */ -class PipelineCommand : public Command { +class ClusterPipelineCommand : public Command { public: - PipelineCommand() : Command(AggregationRequest::kCommandName, false) {} + ClusterPipelineCommand() : Command("aggregate", false) {} virtual bool slaveOk() const { return true; @@ -124,6 +121,7 @@ public: return ClusterAggregate::runAggregate(opCtx, nsStruct, request.getValue(), aggCmd, out); } + } clusterPipelineCmd; } // namespace |