summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/remove2.js360
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp11
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp24
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h24
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp8
5 files changed, 213 insertions, 214 deletions
diff --git a/jstests/sharding/remove2.js b/jstests/sharding/remove2.js
index c74daa28c59..e862167fb12 100644
--- a/jstests/sharding/remove2.js
+++ b/jstests/sharding/remove2.js
@@ -2,188 +2,196 @@
load("jstests/replsets/rslib.js");
-seedString = function(replTest) {
- members = replTest.getReplSetConfig().members.map(function(elem) {
- return elem.host;
- });
- return replTest.name + '/' + members.join(',');
-};
-
-removeShard = function(st, replTest) {
- jsTest.log("Removing shard with name: " + replTest.name);
- var res = st.s.adminCommand({removeShard: replTest.name});
- assert.commandWorked(res);
- assert.eq('started', res.state);
- assert.soon(function() {
- res = st.s.adminCommand({removeShard: replTest.name});
+(function() {
+ 'use strict';
+
+ function seedString(replTest) {
+ var members = replTest.getReplSetConfig().members.map(function(elem) {
+ return elem.host;
+ });
+ return replTest.name + '/' + members.join(',');
+ };
+
+ function removeShard(st, replTest) {
+ jsTest.log("Removing shard with name: " + replTest.name);
+ var res = st.s.adminCommand({removeShard: replTest.name});
assert.commandWorked(res);
- return ('completed' === res.state);
- }, "failed to remove shard: " + tojson(res));
-
- // Drop the database so the shard can be re-added.
- assert.commandWorked(replTest.getPrimary().getDB(coll.getDB().getName()).dropDatabase());
-};
-
-addShard = function(st, replTest) {
- seed = seedString(replTest);
- print("Adding shard with seed: " + seed);
- try {
- assert.eq(true, st.adminCommand({addshard: seed}));
- } catch (e) {
- print("First attempt to addShard failed, trying again");
- // transport error on first attempt is expected. Make sure second attempt goes through
- assert.eq(true, st.adminCommand({addshard: seed}));
+ assert.eq('started', res.state);
+ assert.soon(function() {
+ res = st.s.adminCommand({removeShard: replTest.name});
+ assert.commandWorked(res);
+ return ('completed' === res.state);
+ }, "failed to remove shard: " + tojson(res));
+
+ // Drop the database so the shard can be re-added.
+ assert.commandWorked(replTest.getPrimary().getDB(coll.getDB().getName()).dropDatabase());
+ };
+
+ function addShard(st, replTest) {
+ var seed = seedString(replTest);
+ print("Adding shard with seed: " + seed);
+ try {
+ assert.eq(true, st.adminCommand({addshard: seed}));
+ } catch (e) {
+ print("First attempt to addShard failed, trying again");
+ // transport error on first attempt is expected. Make sure second attempt goes through
+ assert.eq(true, st.adminCommand({addshard: seed}));
+ }
+ awaitRSClientHosts(
+ new Mongo(st.s.host), replTest.getSecondaries(), {ok: true, secondary: true});
+
+ assert.soon(function() {
+ var x = st.chunkDiff(coll.getName(), coll.getDB().getName());
+ print("chunk diff: " + x);
+ return x < 2;
+ }, "no balance happened", 30 * 60 * 1000);
+
+ try {
+ assert.eq(300, coll.find().itcount());
+ } catch (e) {
+ // Expected. First query might get transport error and need to reconnect.
+ printjson(e);
+ assert.eq(300, coll.find().itcount());
+ }
+ print("Shard added successfully");
+ };
+
+ var st = new ShardingTest(
+ {shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}, other: {chunkSize: 1, enableBalancer: true}});
+
+ // Pending resolution of SERVER-8598, we need to wait for deletion after chunk migrations to
+ // avoid a pending delete re-creating a database after it was dropped.
+ st.s.getDB("config").settings.update({_id: "balancer"}, {$set: {_waitForDelete: true}}, true);
+
+ var conn = new Mongo(st.s.host);
+ var coll = conn.getCollection("test.remove2");
+ coll.drop();
+
+ assert.commandWorked(st.s0.adminCommand({enableSharding: coll.getDB().getName()}));
+ st.ensurePrimaryShard(coll.getDB().getName(), st.shard0.shardName);
+ assert.commandWorked(st.s0.adminCommand({shardCollection: coll.getFullName(), key: {i: 1}}));
+
+ // Setup initial data
+ var str = 'a';
+ while (str.length < 1024 * 16) {
+ str += str;
+ }
+
+ var bulk = coll.initializeUnorderedBulkOp();
+ for (var i = 0; i < 300; i++) {
+ bulk.insert({i: i % 10, str: str});
}
- awaitRSClientHosts(
- new Mongo(st.s.host), replTest.getSecondaries(), {ok: true, secondary: true});
+ assert.writeOK(bulk.execute());
+
+ assert.eq(300, coll.find().itcount());
assert.soon(function() {
- var x = st.chunkDiff(coll.getName(), coll.getDB().getName());
+ var x = st.chunkDiff('remove2', "test");
print("chunk diff: " + x);
return x < 2;
}, "no balance happened", 30 * 60 * 1000);
- try {
- assert.eq(300, coll.find().itcount());
- } catch (e) {
- // Expected. First query might get transport error and need to reconnect.
- printjson(e);
- assert.eq(300, coll.find().itcount());
- }
- print("Shard added successfully");
-};
-
-var st = new ShardingTest(
- {shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}, other: {chunkSize: 1, enableBalancer: true}});
-
-// Pending resolution of SERVER-8598, we need to wait for deletion after chunk migrations to avoid
-// a pending delete re-creating a database after it was dropped.
-st.s.getDB("config").settings.update({_id: "balancer"}, {$set: {_waitForDelete: true}}, true);
-
-var rst0 = st._rs[0].test;
-var rst1 = st._rs[1].test;
-
-var conn = new Mongo(st.s.host);
-var coll = conn.getCollection("test.remove2");
-coll.drop();
-
-st.admin.runCommand({enableSharding: coll.getDB().getName()});
-st.ensurePrimaryShard(coll.getDB().getName(), st.shard0.shardName);
-st.admin.runCommand({shardCollection: coll.getFullName(), key: {i: 1}});
-
-// Setup initial data
-var str = 'a';
-while (str.length < 1024 * 16) {
- str += str;
-}
-
-var bulk = coll.initializeUnorderedBulkOp();
-for (var i = 0; i < 300; i++) {
- bulk.insert({i: i % 10, str: str});
-}
-assert.writeOK(bulk.execute());
-
-assert.eq(300, coll.find().itcount());
-
-assert.soon(function() {
- var x = st.chunkDiff('remove2', "test");
- print("chunk diff: " + x);
- return x < 2;
-}, "no balance happened", 30 * 60 * 1000);
-
-assert.eq(300, coll.find().itcount());
-
-st.printShardingStatus();
-
-// Remove shard and add it back in, without shutting it down.
-jsTestLog("Attempting to remove shard and add it back in");
-removeShard(st, rst1);
-addShard(st, rst1);
-
-// Remove shard, restart set, then add it back in.
-jsTestLog("Attempting to remove shard, restart the set, and then add it back in");
-originalSeed = seedString(rst1);
-
-removeShard(st, rst1);
-rst1.stopSet();
-print("Sleeping for 20 seconds to let the other shard's ReplicaSetMonitor time out");
-sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe
-
-rst1.startSet({restart: true});
-rst1.initiate();
-rst1.awaitReplication();
-
-assert.eq(originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before");
-addShard(st, rst1);
-
-// Shut down shard and wait for its ReplicaSetMonitor to be cleaned up, then start it back up and
-// use it.
-// TODO: test this both with AND without waiting for the ReplicaSetMonitor to be cleaned up.
-// This part doesn't pass, even without cleaning up the ReplicaSetMonitor - see SERVER-5900.
-/*printjson( conn.getDB('admin').runCommand({movePrimary : 'test2', to : rst1.name}) );
-printjson( conn.getDB('admin').runCommand({setParameter : 1, replMonitorMaxFailedChecks : 5}) );
-jsTestLog( "Shutting down set" )
-rst1.stopSet();
-jsTestLog( "sleeping for 20 seconds to make sure ReplicaSetMonitor gets cleaned up");
-sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe
-
-// Should fail since rst1 is the primary for test2
-assert.throws(function() {conn.getDB('test2').foo.find().itcount()});
-jsTestLog( "Bringing set back up" );
-rst1.startSet();
-rst1.initiate();
-rst1.awaitReplication();
-
-jsTestLog( "Checking that set is usable again" );
-//conn.getDB('admin').runCommand({flushRouterConfig:1}); // Uncommenting this makes test pass
-conn.getDB('test2').foo.insert({a:1});
-gle = conn.getDB('test2').runCommand('getLastError');
-if ( !gle.ok ) {
- // Expected. First write will fail and need to re-connect
- print( "write failed" );
- printjson( gle );
+ assert.eq(300, coll.find().itcount());
+
+ st.printShardingStatus();
+
+ var rst1 = st.rs1;
+ // Remove shard and add it back in, without shutting it down.
+ jsTestLog("Attempting to remove shard and add it back in");
+ removeShard(st, rst1);
+ addShard(st, rst1);
+
+ // Remove shard, restart set, then add it back in.
+ jsTestLog("Attempting to remove shard, restart the set, and then add it back in");
+ var originalSeed = seedString(rst1);
+
+ removeShard(st, rst1);
+ rst1.stopSet();
+ print("Sleeping for 20 seconds to let the other shard's ReplicaSetMonitor time out");
+ sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe
+
+ rst1.startSet({restart: true});
+ rst1.initiate();
+ rst1.awaitReplication();
+
+ assert.eq(
+ originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before");
+ addShard(st, rst1);
+
+ // Shut down shard and wait for its ReplicaSetMonitor to be cleaned up, then start it back up
+ // and use it.
+ //
+ // TODO: test this both with AND without waiting for the ReplicaSetMonitor to be cleaned up.
+ //
+ // This part doesn't pass, even without cleaning up the ReplicaSetMonitor - see SERVER-5900.
+ /*
+ printjson( conn.getDB('admin').runCommand({movePrimary : 'test2', to : rst1.name}) );
+ printjson( conn.getDB('admin').runCommand({setParameter : 1, replMonitorMaxFailedChecks : 5}) );
+ jsTestLog( "Shutting down set" )
+ rst1.stopSet();
+ jsTestLog( "sleeping for 20 seconds to make sure ReplicaSetMonitor gets cleaned up");
+ sleep(20000); // 1 failed check should take 10 seconds, sleep for 20 just to be safe
+
+ // Should fail since rst1 is the primary for test2
+ assert.throws(function() {conn.getDB('test2').foo.find().itcount()});
+ jsTestLog( "Bringing set back up" );
+ rst1.startSet();
+ rst1.initiate();
+ rst1.awaitReplication();
+
+ jsTestLog( "Checking that set is usable again" );
+ //conn.getDB('admin').runCommand({flushRouterConfig:1}); // Uncommenting this makes test pass
conn.getDB('test2').foo.insert({a:1});
- assert( conn.getDB('test2').getLastErrorObj().ok );
-}
-
-assert.eq( 1, conn.getDB('test2').foo.find().itcount() );
-assert( conn.getDB('test2').dropDatabase().ok );*/
-
-// Remove shard and add a new shard with the same replica set and shard name, but different ports.
-jsTestLog("Attempt removing shard and adding a new shard with the same Replica Set name");
-removeShard(st, rst1);
-rst1.stopSet();
-print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors");
-sleep(60000);
-
-var rst2 = new ReplSetTest({name: rst1.name, nodes: 2, useHostName: true});
-rst2.startSet({shardsvr: ""});
-rst2.initiate();
-rst2.awaitReplication();
-
-addShard(st, rst2);
-printjson(st.admin.runCommand({movePrimary: 'test2', to: rst2.name}));
-
-assert.eq(300, coll.find().itcount());
-conn.getDB('test2').foo.insert({a: 1});
-assert.eq(1, conn.getDB('test2').foo.find().itcount());
-
-// Can't shut down with rst2 in the set or ShardingTest will fail trying to cleanup on shutdown.
-// Have to take out rst2 and put rst1 back into the set so that it can clean up.
-jsTestLog("Putting ShardingTest back to state it expects");
-printjson(st.admin.runCommand({movePrimary: 'test2', to: rst0.name}));
-removeShard(st, rst2);
-rst2.stopSet();
-print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors");
-sleep(60000);
-
-rst1.startSet({restart: true});
-rst1.initiate();
-rst1.awaitReplication();
-
-assert.eq(originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before");
-addShard(st, rst1);
-
-jsTestLog("finishing!");
-// this should be fixed by SERVER-22176
-st.stop({allowedExitCodes: [MongoRunner.EXIT_ABRUPT]});
+ gle = conn.getDB('test2').runCommand('getLastError');
+ if ( !gle.ok ) {
+ // Expected. First write will fail and need to re-connect
+ print( "write failed" );
+ printjson( gle );
+ conn.getDB('test2').foo.insert({a:1});
+ assert( conn.getDB('test2').getLastErrorObj().ok );
+ }
+
+ assert.eq( 1, conn.getDB('test2').foo.find().itcount() );
+ assert( conn.getDB('test2').dropDatabase().ok );
+ */
+
+ // Remove shard and add a new shard with the same replica set and shard name, but different
+ // ports
+ jsTestLog("Attempt removing shard and adding a new shard with the same Replica Set name");
+ removeShard(st, rst1);
+ rst1.stopSet();
+ print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors");
+ sleep(60000);
+
+ var rst2 = new ReplSetTest({name: rst1.name, nodes: 2, useHostName: true});
+ rst2.startSet({shardsvr: ""});
+ rst2.initiate();
+ rst2.awaitReplication();
+
+ addShard(st, rst2);
+ printjson(st.admin.runCommand({movePrimary: 'test2', to: rst2.name}));
+
+ assert.eq(300, coll.find().itcount());
+ conn.getDB('test2').foo.insert({a: 1});
+ assert.eq(1, conn.getDB('test2').foo.find().itcount());
+
+ // Can't shut down with rst2 in the set or ShardingTest will fail trying to cleanup on shutdown.
+ // Have to take out rst2 and put rst1 back into the set so that it can clean up.
+ jsTestLog("Putting ShardingTest back to state it expects");
+ printjson(st.admin.runCommand({movePrimary: 'test2', to: st.rs0.name}));
+ removeShard(st, rst2);
+ rst2.stopSet();
+
+ print("Sleeping for 60 seconds to let the other shards restart their ReplicaSetMonitors");
+ sleep(60000);
+
+ rst1.startSet({restart: true});
+ rst1.initiate();
+ rst1.awaitReplication();
+
+ assert.eq(
+ originalSeed, seedString(rst1), "Set didn't come back up with the same hosts as before");
+ addShard(st, rst1);
+
+ st.stop();
+})();
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index af863000b60..fcfb1156b7f 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -28,7 +28,6 @@
#include "mongo/platform/basic.h"
-#include "mongo/base/init.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/client.h"
@@ -50,8 +49,7 @@ bool isMergePipeline(const std::vector<BSONObj>& pipeline) {
class PipelineCommand : public Command {
public:
- PipelineCommand()
- : Command(AggregationRequest::kCommandName) {} // command is called "aggregate"
+ PipelineCommand() : Command("aggregate", false) {}
// Locks are managed manually, in particular by DocumentSourceCursor.
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
@@ -152,13 +150,8 @@ public:
return runAggregate(opCtx, nss, request.getValue(), cmdObj, *out);
}
-};
-MONGO_INITIALIZER(PipelineCommand)(InitializerContext* context) {
- new PipelineCommand();
-
- return Status::OK();
-}
+} pipelineCmd;
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 163c4f288c1..f915ab47431 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -47,18 +47,18 @@
namespace mongo {
-const StringData AggregationRequest::kCommandName = "aggregate"_sd;
-const StringData AggregationRequest::kCursorName = "cursor"_sd;
-const StringData AggregationRequest::kBatchSizeName = "batchSize"_sd;
-const StringData AggregationRequest::kFromRouterName = "fromRouter"_sd;
-const StringData AggregationRequest::kPipelineName = "pipeline"_sd;
-const StringData AggregationRequest::kCollationName = "collation"_sd;
-const StringData AggregationRequest::kExplainName = "explain"_sd;
-const StringData AggregationRequest::kAllowDiskUseName = "allowDiskUse"_sd;
-const StringData AggregationRequest::kHintName = "hint"_sd;
-const StringData AggregationRequest::kCommentName = "comment"_sd;
-
-const long long AggregationRequest::kDefaultBatchSize = 101;
+constexpr StringData AggregationRequest::kCommandName;
+constexpr StringData AggregationRequest::kCursorName;
+constexpr StringData AggregationRequest::kBatchSizeName;
+constexpr StringData AggregationRequest::kFromRouterName;
+constexpr StringData AggregationRequest::kPipelineName;
+constexpr StringData AggregationRequest::kCollationName;
+constexpr StringData AggregationRequest::kExplainName;
+constexpr StringData AggregationRequest::kAllowDiskUseName;
+constexpr StringData AggregationRequest::kHintName;
+constexpr StringData AggregationRequest::kCommentName;
+
+constexpr long long AggregationRequest::kDefaultBatchSize;
AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
: _nss(std::move(nss)), _pipeline(std::move(pipeline)), _batchSize(kDefaultBatchSize) {}
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 1f290f26a90..14b4dad0e59 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -47,18 +47,18 @@ class Document;
*/
class AggregationRequest {
public:
- static const StringData kCommandName;
- static const StringData kCursorName;
- static const StringData kBatchSizeName;
- static const StringData kFromRouterName;
- static const StringData kPipelineName;
- static const StringData kCollationName;
- static const StringData kExplainName;
- static const StringData kAllowDiskUseName;
- static const StringData kHintName;
- static const StringData kCommentName;
-
- static const long long kDefaultBatchSize;
+ static constexpr StringData kCommandName = "aggregate"_sd;
+ static constexpr StringData kCursorName = "cursor"_sd;
+ static constexpr StringData kBatchSizeName = "batchSize"_sd;
+ static constexpr StringData kFromRouterName = "fromRouter"_sd;
+ static constexpr StringData kPipelineName = "pipeline"_sd;
+ static constexpr StringData kCollationName = "collation"_sd;
+ static constexpr StringData kExplainName = "explain"_sd;
+ static constexpr StringData kAllowDiskUseName = "allowDiskUse"_sd;
+ static constexpr StringData kHintName = "hint"_sd;
+ static constexpr StringData kCommentName = "comment"_sd;
+
+ static constexpr long long kDefaultBatchSize = 101;
/**
* Create a new instance of AggregationRequest by parsing the raw command object. Returns a
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 4f4431b52ec..ffdf5f2ffde 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -41,12 +41,9 @@
namespace mongo {
namespace {
-/**
- * Implements the aggregation (pipeline command for sharding).
- */
-class PipelineCommand : public Command {
+class ClusterPipelineCommand : public Command {
public:
- PipelineCommand() : Command(AggregationRequest::kCommandName, false) {}
+ ClusterPipelineCommand() : Command("aggregate", false) {}
virtual bool slaveOk() const {
return true;
@@ -124,6 +121,7 @@ public:
return ClusterAggregate::runAggregate(opCtx, nsStruct, request.getValue(), aggCmd, out);
}
+
} clusterPipelineCmd;
} // namespace