summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2016-04-12 19:11:23 -0400
committerJudah Schvimer <judah@mongodb.com>2016-04-12 19:11:23 -0400
commit889ca31c1a1e7725b14f291233a4ee4f7afea111 (patch)
tree13ed8e193df51b16f107323b7f764ad598cd333f /src
parent8b2e8b846799db19cb51f626b8c9802f2d4fcabc (diff)
downloadmongo-889ca31c1a1e7725b14f291233a4ee4f7afea111.tar.gz
SERVER-22540 Commands that do writes on mongos should take a writeConcern and aggregate results
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp5
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp12
-rw-r--r--src/mongo/s/client/shard_registry.cpp6
-rw-r--r--src/mongo/s/client/shard_registry.h5
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp10
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp25
-rw-r--r--src/mongo/s/commands/cluster_move_primary_cmd.cpp42
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp19
-rw-r--r--src/mongo/s/commands/cluster_user_management_commands.cpp33
-rw-r--r--src/mongo/s/commands/commands_public.cpp8
-rw-r--r--src/mongo/s/commands/run_on_all_shards_cmd.cpp26
-rw-r--r--src/mongo/s/commands/sharded_command_processing.cpp50
-rw-r--r--src/mongo/s/commands/sharded_command_processing.h43
-rw-r--r--src/mongo/s/config.cpp18
-rw-r--r--src/mongo/s/s_only.cpp22
-rw-r--r--src/mongo/s/write_ops/wc_error_detail.cpp1
-rw-r--r--src/mongo/s/write_ops/wc_error_detail.h2
19 files changed, 301 insertions, 29 deletions
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 3fdc0b7bcb3..1d1e5fa40c1 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -72,7 +72,6 @@ public:
return true;
}
-
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
// This is required to be true to support moveChunk.
return true;
@@ -200,7 +199,6 @@ public:
return true;
}
-
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 9f6216b607d..64ff516acbd 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -499,10 +499,11 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam
shardEntry.getName(),
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
ns.db().toString(),
- BSON("drop" << ns.coll()));
+ BSON("drop" << ns.coll() << "writeConcern" << txn->getWriteConcern().toBSON()));
if (!dropResult.isOK()) {
- return dropResult.getStatus();
+ return Status(dropResult.getStatus().code(),
+ dropResult.getStatus().reason() + " at " + shardEntry.getName());
}
auto dropStatus = getStatusFromCommandResult(dropResult.getValue());
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp
index ab49bad9a13..abdb370f180 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_drop_coll_test.cpp
@@ -92,7 +92,9 @@ public:
onCommand([this, shard](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort(shard.getHost()), request.target);
ASSERT_EQ(_dropNS.db(), request.dbname);
- ASSERT_EQ(BSON("drop" << _dropNS.coll()), request.cmdObj);
+ ASSERT_EQ(BSON("drop" << _dropNS.coll() << "writeConcern"
+ << BSON("w" << 0 << "wtimeout" << 0)),
+ request.cmdObj);
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -213,7 +215,9 @@ TEST_F(DropColl2ShardTest, NSNotFound) {
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort(shard1().getHost()), request.target);
ASSERT_EQ(dropNS().db(), request.dbname);
- ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj);
+ ASSERT_EQ(
+ BSON("drop" << dropNS().coll() << "writeConcern" << BSON("w" << 0 << "wtimeout" << 0)),
+ request.cmdObj);
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -223,7 +227,9 @@ TEST_F(DropColl2ShardTest, NSNotFound) {
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort(shard2().getHost()), request.target);
ASSERT_EQ(dropNS().db(), request.dbname);
- ASSERT_EQ(BSON("drop" << dropNS().coll()), request.cmdObj);
+ ASSERT_EQ(
+ BSON("drop" << dropNS().coll() << "writeConcern" << BSON("w" << 0 << "wtimeout" << 0)),
+ request.cmdObj);
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 3d7f20cc095..e83b687473a 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -163,6 +163,12 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kAllRetriableErrors{
ErrorCodes::NetworkTimeout,
ErrorCodes::InterruptedDueToReplStateChange};
+const ShardRegistry::ErrorCodesSet ShardRegistry::kWriteConcernErrors{
+ ErrorCodes::WriteConcernFailed,
+ ErrorCodes::WriteConcernLegacyOK,
+ ErrorCodes::UnknownReplWriteConcern,
+ ErrorCodes::CannotSatisfyWriteConcern};
+
ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory,
std::unique_ptr<executor::TaskExecutorPool> executorPool,
executor::NetworkInterface* network,
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index a025ceef331..be09dc636ff 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -316,6 +316,11 @@ public:
*/
static const ErrorCodesSet kAllRetriableErrors;
+ /**
+ * Set of error codes that specify write concern related errors.
+ */
+ static const ErrorCodesSet kWriteConcernErrors;
+
private:
using ShardMap = std::unordered_map<ShardId, std::shared_ptr<Shard>>;
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 676c4840fa6..3ea20a66f1c 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -62,6 +62,7 @@ env.Library(
'cluster_write_cmd.cpp',
'commands_public.cpp',
'run_on_all_shards_cmd.cpp',
+ 'sharded_command_processing.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/parallel',
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index e6accb504c6..2400d23931a 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -38,6 +38,7 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/catalog/catalog_cache.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/config.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/cluster_explain.h"
@@ -45,6 +46,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/strategy.h"
+#include "mongo/s/write_ops/wc_error_detail.h"
#include "mongo/util/timer.h"
namespace mongo {
@@ -228,7 +230,13 @@ private:
throw RecvStaleConfigException("FindAndModify", res);
}
- result.appendElements(res);
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = res["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, result);
+ }
+
+ result.appendElementsUnique(res);
return ok;
}
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 662f7278367..d016e7713a4 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -45,12 +45,14 @@
#include "mongo/s/chunk.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/commands/cluster_commands_common.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/config.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/db_util.h"
#include "mongo/s/grid.h"
#include "mongo/s/strategy.h"
+#include "mongo/s/write_ops/wc_error_detail.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/log.h"
@@ -95,7 +97,7 @@ BSONObj fixForShards(const BSONObj& orig,
fn == "sort" || fn == "scope" || fn == "verbose" || fn == "$queryOptions" ||
fn == "readConcern" || fn == LiteParsedQuery::cmdOptionMaxTimeMS) {
b.append(e);
- } else if (fn == "out" || fn == "finalize") {
+ } else if (fn == "out" || fn == "finalize" || fn == "writeConcern") {
// We don't want to copy these
} else {
badShardedField = fn;
@@ -282,7 +284,11 @@ public:
bool ok = conn->runCommand(dbname, cmdObj, res);
conn.done();
- result.appendElements(res);
+ if (auto wcErrorElem = res["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result);
+ }
+
+ result.appendElementsUnique(res);
return ok;
}
@@ -382,6 +388,7 @@ public:
finalCmd.append("inputDB", dbname);
finalCmd.append("shardedOutputCollection", shardResultCollection);
finalCmd.append("shards", shardResultsB.done());
+ finalCmd.append("writeConcern", txn->getWriteConcern().toBSON());
BSONObj shardCounts = shardCountsB.done();
finalCmd.append("shardCounts", shardCounts);
@@ -410,6 +417,7 @@ public:
bool ok = true;
BSONObj singleResult;
+ bool hasWCError = false;
if (!shardedOutput) {
const auto shard = grid.shardRegistry()->getShard(txn, confOut->getPrimaryId());
@@ -425,6 +433,12 @@ public:
outputCount = counts.getIntField("output");
conn.done();
+ if (!hasWCError) {
+ if (auto wcErrorElem = singleResult["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result);
+ hasWCError = true;
+ }
+ }
} else {
LOG(1) << "MR with sharded output, NS=" << outputCollNss.ns();
@@ -501,6 +515,13 @@ public:
server = shard->getConnString().toString();
}
singleResult = mrResult.result;
+ if (!hasWCError) {
+ if (auto wcErrorElem = singleResult["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(
+ mrResult.shardTargetId, wcErrorElem, result);
+ hasWCError = true;
+ }
+ }
ok = singleResult["ok"].trueValue();
if (!ok) {
diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
index a09ddf30dd2..1de2564a2e1 100644
--- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
@@ -46,9 +46,11 @@
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/set_shard_version_request.h"
+#include "mongo/s/write_ops/wc_error_detail.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -188,12 +190,13 @@ public:
// TODO ERH - we need a clone command which replays operations from clone start to now
// can just use local.oplog.$main
BSONObj cloneRes;
- bool worked = toconn->runCommand(dbname.c_str(),
- BSON("clone" << fromShard->getConnString().toString()
- << "collsToIgnore" << barr.arr()
- << bypassDocumentValidationCommandOption()
- << true << "_checkForCatalogChange" << true),
- cloneRes);
+ bool worked = toconn->runCommand(
+ dbname.c_str(),
+ BSON("clone" << fromShard->getConnString().toString() << "collsToIgnore" << barr.arr()
+ << bypassDocumentValidationCommandOption() << true
+ << "_checkForCatalogChange" << true << "writeConcern"
+ << txn->getWriteConcern().toBSON()),
+ cloneRes);
toconn.done();
if (!worked) {
@@ -201,6 +204,11 @@ public:
errmsg = "clone failed";
return false;
}
+ bool hasWCError = false;
+ if (auto wcErrorElem = cloneRes["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(toShard->getId(), wcErrorElem, result);
+ hasWCError = true;
+ }
const string oldPrimary = fromShard->getConnString().toString();
@@ -215,7 +223,15 @@ public:
<< ", no sharded collections in " << dbname;
try {
- fromconn->dropDatabase(dbname.c_str());
+ BSONObj dropDBInfo;
+ fromconn->dropDatabase(dbname.c_str(), txn->getWriteConcern(), &dropDBInfo);
+ if (!hasWCError) {
+ if (auto wcErrorElem = dropDBInfo["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(
+ fromShard->getId(), wcErrorElem, result);
+ hasWCError = true;
+ }
+ }
} catch (DBException& e) {
e.addContext(str::stream() << "movePrimary could not drop the database " << dbname
<< " on " << oldPrimary);
@@ -239,7 +255,17 @@ public:
try {
log() << "movePrimary dropping cloned collection " << el.String() << " on "
<< oldPrimary;
- fromconn->dropCollection(el.String());
+ BSONObj dropCollInfo;
+ fromconn->dropCollection(
+ el.String(), txn->getWriteConcern(), &dropCollInfo);
+ if (!hasWCError) {
+ if (auto wcErrorElem = dropCollInfo["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(
+ fromShard->getId(), wcErrorElem, result);
+ hasWCError = true;
+ }
+ }
+
} catch (DBException& e) {
e.addContext(str::stream()
<< "movePrimary could not drop the cloned collection "
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 3b558c597ca..e81f698ea53 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -49,10 +49,12 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/s/write_ops/wc_error_detail.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -228,6 +230,8 @@ public:
Value(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
}
+ mergeCmd.setField("writeConcern", Value(cmdObj["writeConcern"]));
+
// Not propagating readConcern to merger since it doesn't do local reads.
string outputNsOrEmpty;
@@ -247,9 +251,13 @@ public:
aggRunCommand(conn.get(), dbname, mergeCmd.freeze().toBson(), options);
conn.done();
+ if (auto wcErrorElem = mergedResults["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(mergingShardId, wcErrorElem, result);
+ }
+
// Copy output from merging (primary) shard to the output object from our command.
// Also, propagates errmsg and code if ok == false.
- result.appendElements(mergedResults);
+ result.appendElementsUnique(mergedResults);
return mergedResults["ok"].trueValue();
}
@@ -421,7 +429,14 @@ bool PipelineCommand::aggPassthrough(OperationContext* txn,
ShardConnection conn(shard->getConnString(), "");
BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions);
conn.done();
- out.appendElements(result);
+
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = result["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, out);
+ }
+
+ out.appendElementsUnique(result);
return result["ok"].trueValue();
}
diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp
index a96306ee6ed..ecff2fa2005 100644
--- a/src/mongo/s/commands/cluster_user_management_commands.cpp
+++ b/src/mongo/s/commands/cluster_user_management_commands.cpp
@@ -44,7 +44,9 @@
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/wc_error_detail.h"
namespace mongo {
@@ -833,7 +835,10 @@ namespace {
* Upgrades each shard serially, and stops on first failure. Returned error indicates that
* failure.
*/
-Status runUpgradeOnAllShards(OperationContext* txn, int maxSteps, const BSONObj& writeConcern) {
+Status runUpgradeOnAllShards(OperationContext* txn,
+ int maxSteps,
+ const BSONObj& writeConcern,
+ BSONObjBuilder& result) {
BSONObjBuilder cmdObjBuilder;
cmdObjBuilder.append("authSchemaUpgrade", 1);
cmdObjBuilder.append("maxSteps", maxSteps);
@@ -848,6 +853,7 @@ Status runUpgradeOnAllShards(OperationContext* txn, int maxSteps, const BSONObj&
vector<string> shardIds;
shardRegistry->getAllShardIds(&shardIds);
+ bool hasWCError = false;
for (const auto& shardId : shardIds) {
auto cmdResult = shardRegistry->runIdempotentCommandOnShard(
txn, shardId, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", cmdObj);
@@ -857,6 +863,14 @@ Status runUpgradeOnAllShards(OperationContext* txn, int maxSteps, const BSONObj&
str::stream() << "Failed to run authSchemaUpgrade on shard " << shardId
<< causedBy(cmdResult.getStatus()));
}
+
+ // If the result has a writeConcernError, append it.
+ if (!hasWCError) {
+ if (auto wcErrorElem = cmdResult.getValue()["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, result);
+ hasWCError = true;
+ }
+ }
}
return Status::OK();
@@ -910,9 +924,20 @@ public:
// Optionally run the authSchemaUpgrade command on the individual shards
if (parsedArgs.shouldUpgradeShards) {
- status = runUpgradeOnAllShards(txn, parsedArgs.maxSteps, parsedArgs.writeConcern);
- if (!status.isOK())
- return appendCommandStatus(result, status);
+ status =
+ runUpgradeOnAllShards(txn, parsedArgs.maxSteps, parsedArgs.writeConcern, result);
+ if (!status.isOK()) {
+ // If the status is a write concern error, append a writeConcernError instead of
+ // and error message.
+ if (ShardRegistry::kWriteConcernErrors.count(status.code())) {
+ WCErrorDetail wcError;
+ wcError.setErrMessage(status.reason());
+ wcError.setErrCode(status.code());
+ result.append("writeConcernError", wcError.toBSON());
+ } else {
+ return appendCommandStatus(result, status);
+ }
+ }
}
return true;
}
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 96293f9d698..1d533d1dd36 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -53,6 +53,7 @@
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/run_on_all_shards_cmd.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/config.h"
#include "mongo/s/db_util.h"
#include "mongo/s/grid.h"
@@ -184,7 +185,12 @@ private:
bool ok = conn->runCommand(db, cmdObj, res, passOptions() ? options : 0);
conn.done();
- result.appendElements(res);
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = res["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result);
+ }
+ result.appendElementsUnique(res);
return ok;
}
};
diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
index 6dd3626fde8..e9991643210 100644
--- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp
+++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
@@ -39,8 +39,10 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/db_util.h"
#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/wc_error_detail.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -103,6 +105,11 @@ bool RunOnAllShardsCommand::run(OperationContext* txn,
std::list<std::shared_ptr<Future::CommandResult>>::iterator futuresit;
std::vector<ShardId>::const_iterator shardIdsIt;
+
+ BSONElement wcErrorElem;
+ ShardId wcErrorShardId;
+ bool hasWCError = false;
+
// We iterate over the set of shard ids and their corresponding futures in parallel.
// TODO: replace with zip iterator if we ever decide to use one from Boost or elsewhere
for (futuresit = futures.begin(), shardIdsIt = shardIds.cbegin();
@@ -115,11 +122,25 @@ bool RunOnAllShardsCommand::run(OperationContext* txn,
BSONObj result = res->result();
results.emplace_back(*shardIdsIt, result);
subobj.append(res->getServer(), result);
+
+ if (!hasWCError) {
+ if ((wcErrorElem = result["writeConcernError"])) {
+ wcErrorShardId = *shardIdsIt;
+ hasWCError = true;
+ }
+ }
continue;
}
BSONObj result = res->result();
+ if (!hasWCError) {
+ if ((wcErrorElem = result["writeConcernError"])) {
+ wcErrorShardId = *shardIdsIt;
+ hasWCError = true;
+ }
+ }
+
if (result["errmsg"].type() || result["code"].numberInt() != 0) {
result = specialErrorHandler(res->getServer(), dbName, cmdObj, result);
@@ -154,7 +175,12 @@ bool RunOnAllShardsCommand::run(OperationContext* txn,
subobj.done();
+ if (hasWCError) {
+ appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, output);
+ }
+
BSONObj errobj = errors.done();
+
if (!errobj.isEmpty()) {
errmsg = errobj.toString(false, true);
diff --git a/src/mongo/s/commands/sharded_command_processing.cpp b/src/mongo/s/commands/sharded_command_processing.cpp
new file mode 100644
index 00000000000..04da3cc6d4e
--- /dev/null
+++ b/src/mongo/s/commands/sharded_command_processing.cpp
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2013 mongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/sharded_command_processing.h"
+
+#include "mongo/s/write_ops/wc_error_detail.h"
+
+namespace mongo {
+
+void appendWriteConcernErrorToCmdResponse(const std::string& shardID,
+ const BSONElement& wcErrorElem,
+ BSONObjBuilder& responseBuilder) {
+ WCErrorDetail wcError;
+ std::string errMsg;
+ auto wcErrorObj = wcErrorElem.Obj();
+ if (!wcError.parseBSON(wcErrorObj, &errMsg)) {
+ wcError.setErrMessage("Failed to parse writeConcernError: " + wcErrorObj.toString() +
+ ", Received error: " + errMsg);
+ }
+ wcError.setErrMessage(wcError.getErrMessage() + " at " + shardID);
+ responseBuilder.append("writeConcernError", wcError.toBSON());
+}
+} // namespace mongo
diff --git a/src/mongo/s/commands/sharded_command_processing.h b/src/mongo/s/commands/sharded_command_processing.h
new file mode 100644
index 00000000000..f0424424f89
--- /dev/null
+++ b/src/mongo/s/commands/sharded_command_processing.h
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2013 mongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/db/jsobj.h"
+
+namespace mongo {
+
+/**
+ * This function appends the provided writeConcernError BSONElement to the sharded response.
+ */
+void appendWriteConcernErrorToCmdResponse(const std::string& shardID,
+ const BSONElement& wcErrorElem,
+ BSONObjBuilder& responseBuilder);
+} // namespace mongo
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
index 58242f633b9..acb07fd1e0f 100644
--- a/src/mongo/s/config.cpp
+++ b/src/mongo/s/config.cpp
@@ -581,10 +581,17 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
ScopedDbConnection conn(shard->getConnString(), 30.0);
BSONObj res;
if (!conn->dropDatabase(_name, txn->getWriteConcern(), &res)) {
- errmsg = res.toString();
+ errmsg = res.toString() + " at " + _primaryId;
return 0;
}
conn.done();
+ if (auto wcErrorElem = res["writeConcernError"]) {
+ auto wcError = wcErrorElem.Obj();
+ if (auto errMsgElem = wcError["errmsg"]) {
+ errmsg = errMsgElem.str() + " at " + _primaryId;
+ return false;
+ }
+ }
}
// 4
@@ -597,10 +604,17 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
ScopedDbConnection conn(shard->getConnString(), 30.0);
BSONObj res;
if (!conn->dropDatabase(_name, txn->getWriteConcern(), &res)) {
- errmsg = res.toString();
+ errmsg = res.toString() + " at " + shardId;
return 0;
}
conn.done();
+ if (auto wcErrorElem = res["writeConcernError"]) {
+ auto wcError = wcErrorElem.Obj();
+ if (auto errMsgElem = wcError["errmsg"]) {
+ errmsg = errMsgElem.str() + " at " + shardId;
+ return false;
+ }
+ }
}
LOG(1) << "\t dropped primary db for: " << _name;
diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp
index dddb4a23de2..668124a7dd4 100644
--- a/src/mongo/s/s_only.cpp
+++ b/src/mongo/s/s_only.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/rpc/request_interface.h"
@@ -121,6 +122,27 @@ void Command::execCommandClientBasic(OperationContext* txn,
globalOpCounters.gotCommand();
}
+ StatusWith<WriteConcernOptions> wcResult =
+ WriteConcernOptions::extractWCFromCommand(cmdObj, dbname);
+ if (!wcResult.isOK()) {
+ appendCommandStatus(result, wcResult.getStatus());
+ return;
+ }
+
+ bool supportsWriteConcern = c->supportsWriteConcern(cmdObj);
+ if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {
+ // This command doesn't do writes so it should not be passed a writeConcern.
+ // If we did not use the default writeConcern, one was provided when it shouldn't have
+ // been by the user.
+ appendCommandStatus(
+ result, Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern"));
+ return;
+ }
+
+ if (supportsWriteConcern) {
+ txn->setWriteConcern(wcResult.getValue());
+ }
+
std::string errmsg;
bool ok = false;
try {
diff --git a/src/mongo/s/write_ops/wc_error_detail.cpp b/src/mongo/s/write_ops/wc_error_detail.cpp
index 33528c42525..08abdd22064 100644
--- a/src/mongo/s/write_ops/wc_error_detail.cpp
+++ b/src/mongo/s/write_ops/wc_error_detail.cpp
@@ -176,5 +176,4 @@ const string& WCErrorDetail::getErrMessage() const {
dassert(_isErrMessageSet);
return _errMessage;
}
-
} // namespace mongo
diff --git a/src/mongo/s/write_ops/wc_error_detail.h b/src/mongo/s/write_ops/wc_error_detail.h
index 05d07d2f090..12a8d1c2d60 100644
--- a/src/mongo/s/write_ops/wc_error_detail.h
+++ b/src/mongo/s/write_ops/wc_error_detail.h
@@ -38,7 +38,7 @@ namespace mongo {
/**
* This class represents the layout and content of the error that occurs while trying
- * to satisfy the write concern after executing the insert/update/delete runCommand.
+ * to satisfy the write concern after executing runCommand.
*/
class WCErrorDetail {
MONGO_DISALLOW_COPYING(WCErrorDetail);