summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/commands/chunk_manager_targeter.cpp138
-rw-r--r--src/mongo/s/commands/chunk_manager_targeter.h41
-rw-r--r--src/mongo/s/commands/cluster_write.cpp13
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp49
-rw-r--r--src/mongo/s/ns_targeter.h24
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.h60
-rw-r--r--src/mongo/s/write_ops/write_op.cpp62
7 files changed, 172 insertions, 215 deletions
diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/commands/chunk_manager_targeter.cpp
index 4b5dae927d5..6388c8b6626 100644
--- a/src/mongo/s/commands/chunk_manager_targeter.cpp
+++ b/src/mongo/s/commands/chunk_manager_targeter.cpp
@@ -272,9 +272,8 @@ const NamespaceString& ChunkManagerTargeter::getNS() const {
return _nss;
}
-Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
- const BSONObj& doc,
- ShardEndpoint** endpoint) const {
+StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
+ const BSONObj& doc) const {
BSONObj shardKey;
if (_routingInfo->cm()) {
@@ -302,7 +301,7 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
// Target the shard key or database primary
if (!shardKey.isEmpty()) {
- *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release();
+ return _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize());
} else {
if (!_routingInfo->primary()) {
return Status(ErrorCodes::NamespaceNotFound,
@@ -310,16 +309,14 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
<< "; no metadata found");
}
- *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED());
+ return ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED());
}
return Status::OK();
}
-Status ChunkManagerTargeter::targetUpdate(
- OperationContext* opCtx,
- const write_ops::UpdateOpEntry& updateDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const {
//
// Update targeting may use either the query or the update. This is to support save-style
// updates, of the form:
@@ -386,9 +383,8 @@ Status ChunkManagerTargeter::targetUpdate(
// Target the shard key, query, or replacement doc
if (!shardKey.isEmpty()) {
try {
- endpoints->push_back(
- targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize())));
- return Status::OK();
+ return std::vector<ShardEndpoint>{
+ _targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize()))};
} catch (const DBException&) {
// This update is potentially not constrained to a single shard
}
@@ -428,36 +424,33 @@ Status ChunkManagerTargeter::targetUpdate(
"$expr is not allowed in the query predicate for an upsert"};
}
if (!cq.isOK()) {
- return Status(cq.getStatus().code(),
- str::stream() << "Could not parse update query " << updateDoc.getQ()
- << causedBy(cq.getStatus()));
+ return {cq.getStatus().code(),
+ str::stream() << "Could not parse update query " << updateDoc.getQ()
+ << causedBy(cq.getStatus())};
}
// Single (non-multi) updates must target a single shard or be exact-ID.
if (_routingInfo->cm() && !updateDoc.getMulti() &&
!isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) {
- return Status(ErrorCodes::ShardKeyNotFound,
- str::stream()
- << "A single update on a sharded collection must contain an exact "
- "match on _id (and have the collection default collation) or "
- "contain the shard key (and have the simple collation). Update "
- "request: "
- << updateDoc.toBSON()
- << ", shard key pattern: "
- << _routingInfo->cm()->getShardKeyPattern().toString());
+ return {ErrorCodes::ShardKeyNotFound,
+ str::stream() << "A single update on a sharded collection must contain an exact "
+ "match on _id (and have the collection default collation) or "
+ "contain the shard key (and have the simple collation). Update "
+ "request: "
+ << updateDoc.toBSON()
+ << ", shard key pattern: "
+ << _routingInfo->cm()->getShardKeyPattern().toString()};
}
if (updateType == UpdateType_OpStyle) {
- return targetQuery(opCtx, query, collation, endpoints);
+ return _targetQuery(opCtx, query, collation);
} else {
- return targetDoc(opCtx, updateExpr, collation, endpoints);
+ return _targetDoc(opCtx, updateExpr, collation);
}
}
-Status ChunkManagerTargeter::targetDelete(
- OperationContext* opCtx,
- const write_ops::DeleteOpEntry& deleteDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const {
BSONObj shardKey;
if (_routingInfo->cm()) {
@@ -484,8 +477,7 @@ Status ChunkManagerTargeter::targetDelete(
// Target the shard key or delete query
if (!shardKey.isEmpty()) {
try {
- endpoints->push_back(targetShardKey(shardKey, collation, 0));
- return Status::OK();
+ return std::vector<ShardEndpoint>{_targetShardKey(shardKey, collation, 0)};
} catch (const DBException&) {
// This delete is potentially not constrained to a single shard
}
@@ -525,24 +517,18 @@ Status ChunkManagerTargeter::targetDelete(
<< _routingInfo->cm()->getShardKeyPattern().toString());
}
- return targetQuery(opCtx, deleteDoc.getQ(), collation, endpoints);
+ return _targetQuery(opCtx, deleteDoc.getQ(), collation);
}
-Status ChunkManagerTargeter::targetDoc(
- OperationContext* opCtx,
- const BSONObj& doc,
- const BSONObj& collation,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
- // NOTE: This is weird and fragile, but it's the way our language works right now -
- // documents are either A) invalid or B) valid equality queries over themselves.
- return targetQuery(opCtx, doc, collation, endpoints);
+StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetDoc(
+ OperationContext* opCtx, const BSONObj& doc, const BSONObj& collation) const {
+ // NOTE: This is weird and fragile, but it's the way our language works right now - documents
+ // are either A) invalid or B) valid equality queries over themselves.
+ return _targetQuery(opCtx, doc, collation);
}
-Status ChunkManagerTargeter::targetQuery(
- OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& collation,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
+ OperationContext* opCtx, const BSONObj& query, const BSONObj& collation) const {
if (!_routingInfo->primary() && !_routingInfo->cm()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "could not target query in " << getNS().ns()
@@ -560,19 +546,19 @@ Status ChunkManagerTargeter::targetQuery(
shardIds.insert(_routingInfo->primary()->getId());
}
- for (const ShardId& shardId : shardIds) {
- endpoints->push_back(stdx::make_unique<ShardEndpoint>(
- shardId,
- _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
- : ChunkVersion::UNSHARDED()));
+ std::vector<ShardEndpoint> endpoints;
+ for (auto&& shardId : shardIds) {
+ const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED();
+ endpoints.emplace_back(std::move(shardId), version);
}
- return Status::OK();
+ return endpoints;
}
-std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
- const BSONObj& collation,
- long long estDataSize) const {
+ShardEndpoint ChunkManagerTargeter::_targetShardKey(const BSONObj& shardKey,
+ const BSONObj& collation,
+ long long estDataSize) const {
const auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation);
// Track autosplit stats for sharded collections
@@ -581,12 +567,10 @@ std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONOb
_stats->chunkSizeDelta[chunk->getMin()] += estDataSize;
}
- return stdx::make_unique<ShardEndpoint>(chunk->getShardId(),
- _routingInfo->cm()->getVersion(chunk->getShardId()));
+ return {chunk->getShardId(), _routingInfo->cm()->getVersion(chunk->getShardId())};
}
-Status ChunkManagerTargeter::targetCollection(
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetCollection() const {
if (!_routingInfo->primary() && !_routingInfo->cm()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "could not target full range of " << getNS().ns()
@@ -600,18 +584,18 @@ Status ChunkManagerTargeter::targetCollection(
shardIds.insert(_routingInfo->primary()->getId());
}
- for (const ShardId& shardId : shardIds) {
- endpoints->push_back(stdx::make_unique<ShardEndpoint>(
- shardId,
- _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
- : ChunkVersion::UNSHARDED()));
+ std::vector<ShardEndpoint> endpoints;
+ for (auto&& shardId : shardIds) {
+ const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED();
+ endpoints.emplace_back(std::move(shardId), version);
}
- return Status::OK();
+ return endpoints;
}
-Status ChunkManagerTargeter::targetAllShards(
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards(
+ OperationContext* opCtx) const {
if (!_routingInfo->primary() && !_routingInfo->cm()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "could not target every shard with versions for " << getNS().ns()
@@ -619,16 +603,16 @@ Status ChunkManagerTargeter::targetAllShards(
}
std::vector<ShardId> shardIds;
- grid.shardRegistry()->getAllShardIds(&shardIds);
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
- for (const ShardId& shardId : shardIds) {
- endpoints->push_back(stdx::make_unique<ShardEndpoint>(
- shardId,
- _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
- : ChunkVersion::UNSHARDED()));
+ std::vector<ShardEndpoint> endpoints;
+ for (auto&& shardId : shardIds) {
+ const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED();
+ endpoints.emplace_back(std::move(shardId), version);
}
- return Status::OK();
+ return endpoints;
}
void ChunkManagerTargeter::noteCouldNotTarget() {
@@ -717,7 +701,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
// If didn't already refresh the targeting information, refresh it
if (!alreadyRefreshed) {
// To match previous behavior, we just need an incremental refresh here
- return refreshNow(opCtx);
+ return _refreshNow(opCtx);
}
*wasChanged = isMetadataDifferent(
@@ -734,7 +718,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
if (result == CompareResult_Unknown || result == CompareResult_LT) {
// Our current shard versions aren't all comparable to the old versions, maybe drop
- return refreshNow(opCtx);
+ return _refreshNow(opCtx);
}
*wasChanged = isMetadataDifferent(
@@ -745,7 +729,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
MONGO_UNREACHABLE;
}
-Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) {
+Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) {
Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo));
return init(opCtx);
diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/commands/chunk_manager_targeter.h
index 2ff268871a4..6de41ed3fe7 100644
--- a/src/mongo/s/commands/chunk_manager_targeter.h
+++ b/src/mongo/s/commands/chunk_manager_targeter.h
@@ -67,23 +67,20 @@ public:
const NamespaceString& getNS() const override;
// Returns ShardKeyNotFound if document does not have a full shard key.
- Status targetInsert(OperationContext* opCtx,
- const BSONObj& doc,
- ShardEndpoint** endpoint) const override;
+ StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx,
+ const BSONObj& doc) const override;
// Returns ShardKeyNotFound if the update can't be targeted without a shard key.
- Status targetUpdate(OperationContext* opCtx,
- const write_ops::UpdateOpEntry& updateDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override;
+ StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override;
// Returns ShardKeyNotFound if the delete can't be targeted without a shard key.
- Status targetDelete(OperationContext* opCtx,
- const write_ops::DeleteOpEntry& deleteDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override;
+ StatusWith<std::vector<ShardEndpoint>> targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const override;
- Status targetCollection(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override;
+ StatusWith<std::vector<ShardEndpoint>> targetCollection() const override;
- Status targetAllShards(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override;
+ StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override;
void noteCouldNotTarget() override;
@@ -106,7 +103,7 @@ private:
/**
* Performs an actual refresh from the config server.
*/
- Status refreshNow(OperationContext* opCtx);
+ Status _refreshNow(OperationContext* opCtx);
/**
* Returns a vector of ShardEndpoints where a document might need to be placed.
@@ -115,10 +112,9 @@ private:
*
* If 'collation' is empty, we use the collection default collation for targeting.
*/
- Status targetDoc(OperationContext* opCtx,
- const BSONObj& doc,
- const BSONObj& collation,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const;
+ StatusWith<std::vector<ShardEndpoint>> _targetDoc(OperationContext* opCtx,
+ const BSONObj& doc,
+ const BSONObj& collation) const;
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard query.
@@ -127,10 +123,9 @@ private:
*
* If 'collation' is empty, we use the collection default collation for targeting.
*/
- Status targetQuery(OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& collation,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const;
+ StatusWith<std::vector<ShardEndpoint>> _targetQuery(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& collation) const;
/**
* Returns a ShardEndpoint for an exact shard key query.
@@ -140,9 +135,9 @@ private:
*
* If 'collation' is empty, we use the collection default collation for targeting.
*/
- std::unique_ptr<ShardEndpoint> targetShardKey(const BSONObj& doc,
- const BSONObj& collation,
- long long estDataSize) const;
+ ShardEndpoint _targetShardKey(const BSONObj& doc,
+ const BSONObj& collation,
+ long long estDataSize) const;
// Full namespace of the collection for this targeter
const NamespaceString _nss;
diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp
index 12d44edc75b..2593fb866fe 100644
--- a/src/mongo/s/commands/cluster_write.cpp
+++ b/src/mongo/s/commands/cluster_write.cpp
@@ -205,22 +205,23 @@ void ClusterWriter::write(OperationContext* opCtx,
return;
}
- std::vector<std::unique_ptr<ShardEndpoint>> endpoints;
- auto targetStatus = targeter.targetCollection(&endpoints);
- if (!targetStatus.isOK()) {
- toBatchError({targetStatus.code(),
+ auto swEndpoints = targeter.targetCollection();
+ if (!swEndpoints.isOK()) {
+ toBatchError({swEndpoints.getStatus().code(),
str::stream() << "unable to target"
<< (request.isInsertIndexRequest() ? " index" : "")
<< " write op for collection "
<< request.getTargetingNS().ns()
- << causedBy(targetStatus)},
+ << causedBy(swEndpoints.getStatus())},
response);
return;
}
+ const auto& endpoints = swEndpoints.getValue();
+
// Handle sharded config server writes differently.
if (std::any_of(endpoints.begin(), endpoints.end(), [](const auto& it) {
- return it->shardName == ShardRegistry::kConfigServerShardId;
+ return it.shardName == ShardRegistry::kConfigServerShardId;
})) {
// There should be no namespaces that partially target config servers.
invariant(endpoints.size() == 1);
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 8f7581d402e..8dd4a34fe11 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -271,41 +271,28 @@ private:
if (!status.isOK())
return status;
- std::vector<std::unique_ptr<ShardEndpoint>> endpoints;
-
- if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
- ShardEndpoint* endpoint;
- Status status =
- targeter.targetInsert(opCtx, targetingBatchItem.getDocument(), &endpoint);
- if (!status.isOK())
- return status;
- endpoints.push_back(std::unique_ptr<ShardEndpoint>{endpoint});
- } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
- Status status =
- targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate(), &endpoints);
- if (!status.isOK())
- return status;
- } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) {
- Status status =
- targeter.targetDelete(opCtx, targetingBatchItem.getDelete(), &endpoints);
- if (!status.isOK())
- return status;
- } else {
- MONGO_UNREACHABLE;
- }
+ auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> {
+ if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
+ auto swEndpoint = targeter.targetInsert(opCtx, targetingBatchItem.getDocument());
+ if (!swEndpoint.isOK())
+ return swEndpoint.getStatus();
+ return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())};
+ } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
+ return targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate());
+ } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) {
+ return targeter.targetDelete(opCtx, targetingBatchItem.getDelete());
+ } else {
+ MONGO_UNREACHABLE;
+ }
+ }();
- auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!swEndpoints.isOK())
+ return swEndpoints.getStatus();
// Assemble requests
std::vector<AsyncRequestsSender::Request> requests;
- for (auto it = endpoints.begin(); it != endpoints.end(); ++it) {
- const ShardEndpoint* endpoint = it->get();
-
- auto shardStatus = shardRegistry->getShard(opCtx, endpoint->shardName);
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
- requests.emplace_back(shardStatus.getValue()->getId(), command);
+ for (const auto& endpoint : swEndpoints.getValue()) {
+ requests.emplace_back(endpoint.shardName, command);
}
// Send the requests.
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index e3a554cd252..f915be42ec4 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -30,7 +30,7 @@
#include <vector>
-#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
@@ -94,43 +94,39 @@ public:
*
* Returns !OK with message if document could not be targeted for other reasons.
*/
- virtual Status targetInsert(OperationContext* opCtx,
- const BSONObj& doc,
- ShardEndpoint** endpoint) const = 0;
+ virtual StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx,
+ const BSONObj& doc) const = 0;
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard update.
*
* Returns OK and fills the endpoints; returns a status describing the error otherwise.
*/
- virtual Status targetUpdate(OperationContext* opCtx,
- const write_ops::UpdateOpEntry& updateDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0;
+ virtual StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const = 0;
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard delete.
*
* Returns OK and fills the endpoints; returns a status describing the error otherwise.
*/
- virtual Status targetDelete(OperationContext* opCtx,
- const write_ops::DeleteOpEntry& deleteDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0;
+ virtual StatusWith<std::vector<ShardEndpoint>> targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const = 0;
/**
* Returns a vector of ShardEndpoints for the entire collection.
*
* Returns !OK with message if the full collection could not be targeted.
*/
- virtual Status targetCollection(
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0;
+ virtual StatusWith<std::vector<ShardEndpoint>> targetCollection() const = 0;
/**
* Returns a vector of ShardEndpoints for all shards.
*
* Returns !OK with message if all shards could not be targeted.
*/
- virtual Status targetAllShards(
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0;
+ virtual StatusWith<std::vector<ShardEndpoint>> targetAllShards(
+ OperationContext* opCtx) const = 0;
/**
* Informs the targeter that a targeting failure occurred during one of the last targeting
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h
index 89ac1e64ce5..b546e71a2af 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.h
+++ b/src/mongo/s/write_ops/mock_ns_targeter.h
@@ -72,49 +72,46 @@ public:
/**
* Returns a ShardEndpoint for the doc from the mock ranges
*/
- Status targetInsert(OperationContext* opCtx,
- const BSONObj& doc,
- ShardEndpoint** endpoint) const override {
- std::vector<std::unique_ptr<ShardEndpoint>> endpoints;
- Status status = _targetQuery(doc, &endpoints);
- if (!status.isOK())
- return status;
- if (!endpoints.empty())
- *endpoint = endpoints.front().release();
- return Status::OK();
+ StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx,
+ const BSONObj& doc) const override {
+ auto swEndpoints = _targetQuery(doc);
+ if (!swEndpoints.isOK())
+ return swEndpoints.getStatus();
+
+ ASSERT_EQ(1U, swEndpoints.getValue().size());
+ return swEndpoints.getValue().front();
}
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- Status targetUpdate(OperationContext* opCtx,
- const write_ops::UpdateOpEntry& updateDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override {
- return _targetQuery(updateDoc.getQ(), endpoints);
+ StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ return _targetQuery(updateDoc.getQ());
}
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- Status targetDelete(OperationContext* opCtx,
- const write_ops::DeleteOpEntry& deleteDoc,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
- return _targetQuery(deleteDoc.getQ(), endpoints);
+ StatusWith<std::vector<ShardEndpoint>> targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const {
+ return _targetQuery(deleteDoc.getQ());
}
- Status targetCollection(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override {
+ StatusWith<std::vector<ShardEndpoint>> targetCollection() const override {
// No-op
- return Status::OK();
+ return std::vector<ShardEndpoint>{};
}
- Status targetAllShards(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override {
+ StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override {
+ std::vector<ShardEndpoint> endpoints;
for (const auto& range : _mockRanges) {
- endpoints->push_back(stdx::make_unique<ShardEndpoint>(range.endpoint));
+ endpoints.push_back(range.endpoint);
}
- return Status::OK();
+ return endpoints;
}
void noteCouldNotTarget() override {
@@ -158,23 +155,24 @@ private:
}
/**
- * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
- * queries of the form { field : { $gte : <value>, $lt : <value> } }.
+ * Returns the first ShardEndpoint for the query from the mock ranges. Only handles queries of
+ * the form { field : { $gte : <value>, $lt : <value> } }.
*/
- Status _targetQuery(const BSONObj& query,
- std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
- ChunkRange queryRange(_parseRange(query));
+ StatusWith<std::vector<ShardEndpoint>> _targetQuery(const BSONObj& query) const {
+ const ChunkRange queryRange(_parseRange(query));
+
+ std::vector<ShardEndpoint> endpoints;
for (const auto& range : _mockRanges) {
if (queryRange.overlapWith(range.range)) {
- endpoints->push_back(stdx::make_unique<ShardEndpoint>(range.endpoint));
+ endpoints.push_back(range.endpoint);
}
}
- if (endpoints->empty())
+ if (endpoints.empty())
return {ErrorCodes::UnknownError, "no mock ranges found for query"};
- return Status::OK();
+ return endpoints;
}
NamespaceString _nss;
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index 2d77ed4b56b..ad3de6cc2e4 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -55,59 +55,55 @@ Status WriteOp::targetWrites(OperationContext* opCtx,
std::vector<TargetedWrite*>* targetedWrites) {
const bool isIndexInsert = _itemRef.getRequest()->isInsertIndexRequest();
- Status targetStatus = Status::OK();
- std::vector<std::unique_ptr<ShardEndpoint>> endpoints;
-
- if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) {
- if (isIndexInsert) {
- // TODO: Remove the index targeting stuff once there is a command for it?
- // TODO: Retry index writes with stale version?
- targetStatus = targeter.targetCollection(&endpoints);
- } else {
- ShardEndpoint* endpoint = nullptr;
- targetStatus = targeter.targetInsert(opCtx, _itemRef.getDocument(), &endpoint);
- if (targetStatus.isOK()) {
- // Store single endpoint result if we targeted a single endpoint
- endpoints.push_back(std::unique_ptr<ShardEndpoint>{endpoint});
+ auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> {
+ if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) {
+ if (isIndexInsert) {
+ // TODO: Remove the index targeting stuff once there is a command for it?
+ // TODO: Retry index writes with stale version?
+ return targeter.targetCollection();
}
+
+ auto swEndpoint = targeter.targetInsert(opCtx, _itemRef.getDocument());
+ if (!swEndpoint.isOK())
+ return swEndpoint.getStatus();
+
+ return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())};
+ } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) {
+ return targeter.targetUpdate(opCtx, _itemRef.getUpdate());
+ } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) {
+ return targeter.targetDelete(opCtx, _itemRef.getDelete());
+ } else {
+ MONGO_UNREACHABLE;
}
- } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) {
- targetStatus = targeter.targetUpdate(opCtx, _itemRef.getUpdate(), &endpoints);
- } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) {
- targetStatus = targeter.targetDelete(opCtx, _itemRef.getDelete(), &endpoints);
- } else {
- MONGO_UNREACHABLE;
- }
+ }();
// If we're targeting more than one endpoint with an update/delete, we have to target everywhere
// since we cannot currently retry partial results.
//
// NOTE: Index inserts are currently specially targeted only at the current collection to avoid
// creating collections everywhere.
- if (targetStatus.isOK() && endpoints.size() > 1u && !isIndexInsert) {
- endpoints.clear();
- targetStatus = targeter.targetAllShards(&endpoints);
+ if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !isIndexInsert) {
+ swEndpoints = targeter.targetAllShards(opCtx);
}
// If we had an error, stop here
- if (!targetStatus.isOK())
- return targetStatus;
+ if (!swEndpoints.isOK())
+ return swEndpoints.getStatus();
- for (auto it = endpoints.begin(); it != endpoints.end(); ++it) {
- ShardEndpoint* endpoint = it->get();
+ auto& endpoints = swEndpoints.getValue();
+ for (auto&& endpoint : endpoints) {
_childOps.emplace_back(this);
WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
// For now, multiple endpoints imply no versioning - we can't retry half a multi-write
- if (endpoints.size() == 1u) {
- targetedWrites->push_back(new TargetedWrite(*endpoint, ref));
- } else {
- ShardEndpoint broadcastEndpoint(endpoint->shardName, ChunkVersion::IGNORED());
- targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref));
+ if (endpoints.size() > 1u) {
+ endpoint.shardVersion = ChunkVersion::IGNORED();
}
+ targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
+
_childOps.back().pendingWrite = targetedWrites->back();
_childOps.back().state = WriteOpState_Pending;
}