diff options
-rw-r--r-- | src/mongo/s/commands/chunk_manager_targeter.cpp | 138 | ||||
-rw-r--r-- | src/mongo/s/commands/chunk_manager_targeter.h | 41 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 49 | ||||
-rw-r--r-- | src/mongo/s/ns_targeter.h | 24 | ||||
-rw-r--r-- | src/mongo/s/write_ops/mock_ns_targeter.h | 60 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.cpp | 62 |
7 files changed, 172 insertions, 215 deletions
diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/commands/chunk_manager_targeter.cpp index 4b5dae927d5..6388c8b6626 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.cpp +++ b/src/mongo/s/commands/chunk_manager_targeter.cpp @@ -272,9 +272,8 @@ const NamespaceString& ChunkManagerTargeter::getNS() const { return _nss; } -Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, - const BSONObj& doc, - ShardEndpoint** endpoint) const { +StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* opCtx, + const BSONObj& doc) const { BSONObj shardKey; if (_routingInfo->cm()) { @@ -302,7 +301,7 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, // Target the shard key or database primary if (!shardKey.isEmpty()) { - *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release(); + return _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()); } else { if (!_routingInfo->primary()) { return Status(ErrorCodes::NamespaceNotFound, @@ -310,16 +309,14 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx, << "; no metadata found"); } - *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED()); + return ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED()); } return Status::OK(); } -Status ChunkManagerTargeter::targetUpdate( - OperationContext* opCtx, - const write_ops::UpdateOpEntry& updateDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { +StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const { // // Update targeting may use either the query or the update. This is to support save-style // updates, of the form: @@ -386,9 +383,8 @@ Status ChunkManagerTargeter::targetUpdate( // Target the shard key, query, or replacement doc if (!shardKey.isEmpty()) { try { - endpoints->push_back( - targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize()))); - return Status::OK(); + return std::vector<ShardEndpoint>{ + _targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize()))}; } catch (const DBException&) { // This update is potentially not constrained to a single shard } @@ -428,36 +424,33 @@ Status ChunkManagerTargeter::targetUpdate( "$expr is not allowed in the query predicate for an upsert"}; } if (!cq.isOK()) { - return Status(cq.getStatus().code(), - str::stream() << "Could not parse update query " << updateDoc.getQ() - << causedBy(cq.getStatus())); + return {cq.getStatus().code(), + str::stream() << "Could not parse update query " << updateDoc.getQ() + << causedBy(cq.getStatus())}; } // Single (non-multi) updates must target a single shard or be exact-ID. if (_routingInfo->cm() && !updateDoc.getMulti() && !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { - return Status(ErrorCodes::ShardKeyNotFound, - str::stream() - << "A single update on a sharded collection must contain an exact " - "match on _id (and have the collection default collation) or " - "contain the shard key (and have the simple collation). Update " - "request: " - << updateDoc.toBSON() - << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); + return {ErrorCodes::ShardKeyNotFound, + str::stream() << "A single update on a sharded collection must contain an exact " + "match on _id (and have the collection default collation) or " + "contain the shard key (and have the simple collation). Update " + "request: " + << updateDoc.toBSON() + << ", shard key pattern: " + << _routingInfo->cm()->getShardKeyPattern().toString()}; } if (updateType == UpdateType_OpStyle) { - return targetQuery(opCtx, query, collation, endpoints); + return _targetQuery(opCtx, query, collation); } else { - return targetDoc(opCtx, updateExpr, collation, endpoints); + return _targetDoc(opCtx, updateExpr, collation); } } -Status ChunkManagerTargeter::targetDelete( - OperationContext* opCtx, - const write_ops::DeleteOpEntry& deleteDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { +StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const { BSONObj shardKey; if (_routingInfo->cm()) { @@ -484,8 +477,7 @@ Status ChunkManagerTargeter::targetDelete( // Target the shard key or delete query if (!shardKey.isEmpty()) { try { - endpoints->push_back(targetShardKey(shardKey, collation, 0)); - return Status::OK(); + return std::vector<ShardEndpoint>{_targetShardKey(shardKey, collation, 0)}; } catch (const DBException&) { // This delete is potentially not constrained to a single shard } @@ -525,24 +517,18 @@ Status ChunkManagerTargeter::targetDelete( << _routingInfo->cm()->getShardKeyPattern().toString()); } - return targetQuery(opCtx, deleteDoc.getQ(), collation, endpoints); + return _targetQuery(opCtx, deleteDoc.getQ(), collation); } -Status ChunkManagerTargeter::targetDoc( - OperationContext* opCtx, - const BSONObj& doc, - const BSONObj& collation, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - // NOTE: This is weird and fragile, but it's the way our language works right now - - // documents are either A) invalid or B) valid equality queries over themselves. - return targetQuery(opCtx, doc, collation, endpoints); +StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetDoc( + OperationContext* opCtx, const BSONObj& doc, const BSONObj& collation) const { + // NOTE: This is weird and fragile, but it's the way our language works right now - documents + // are either A) invalid or B) valid equality queries over themselves. + return _targetQuery(opCtx, doc, collation); } -Status ChunkManagerTargeter::targetQuery( - OperationContext* opCtx, - const BSONObj& query, - const BSONObj& collation, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { +StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( + OperationContext* opCtx, const BSONObj& query, const BSONObj& collation) const { if (!_routingInfo->primary() && !_routingInfo->cm()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "could not target query in " << getNS().ns() @@ -560,19 +546,19 @@ Status ChunkManagerTargeter::targetQuery( shardIds.insert(_routingInfo->primary()->getId()); } - for (const ShardId& shardId : shardIds) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); + std::vector<ShardEndpoint> endpoints; + for (auto&& shardId : shardIds) { + const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED(); + endpoints.emplace_back(std::move(shardId), version); } - return Status::OK(); + return endpoints; } -std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey, - const BSONObj& collation, - long long estDataSize) const { +ShardEndpoint ChunkManagerTargeter::_targetShardKey(const BSONObj& shardKey, + const BSONObj& collation, + long long estDataSize) const { const auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); // Track autosplit stats for sharded collections @@ -581,12 +567,10 @@ std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONOb _stats->chunkSizeDelta[chunk->getMin()] += estDataSize; } - return stdx::make_unique<ShardEndpoint>(chunk->getShardId(), - _routingInfo->cm()->getVersion(chunk->getShardId())); + return {chunk->getShardId(), _routingInfo->cm()->getVersion(chunk->getShardId())}; } -Status ChunkManagerTargeter::targetCollection( - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { +StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetCollection() const { if (!_routingInfo->primary() && !_routingInfo->cm()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "could not target full range of " << getNS().ns() @@ -600,18 +584,18 @@ Status ChunkManagerTargeter::targetCollection( shardIds.insert(_routingInfo->primary()->getId()); } - for (const ShardId& shardId : shardIds) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); + std::vector<ShardEndpoint> endpoints; + for (auto&& shardId : shardIds) { + const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED(); + endpoints.emplace_back(std::move(shardId), version); } - return Status::OK(); + return endpoints; } -Status ChunkManagerTargeter::targetAllShards( - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { +StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards( + OperationContext* opCtx) const { if (!_routingInfo->primary() && !_routingInfo->cm()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "could not target every shard with versions for " << getNS().ns() @@ -619,16 +603,16 @@ Status ChunkManagerTargeter::targetAllShards( } std::vector<ShardId> shardIds; - grid.shardRegistry()->getAllShardIds(&shardIds); + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - for (const ShardId& shardId : shardIds) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>( - shardId, - _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED())); + std::vector<ShardEndpoint> endpoints; + for (auto&& shardId : shardIds) { + const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) + : ChunkVersion::UNSHARDED(); + endpoints.emplace_back(std::move(shardId), version); } - return Status::OK(); + return endpoints; } void ChunkManagerTargeter::noteCouldNotTarget() { @@ -717,7 +701,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here - return refreshNow(opCtx); + return _refreshNow(opCtx); } *wasChanged = isMetadataDifferent( @@ -734,7 +718,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC if (result == CompareResult_Unknown || result == CompareResult_LT) { // Our current shard versions aren't all comparable to the old versions, maybe drop - return refreshNow(opCtx); + return _refreshNow(opCtx); } *wasChanged = isMetadataDifferent( @@ -745,7 +729,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC MONGO_UNREACHABLE; } -Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) { +Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) { Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo)); return init(opCtx); diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/commands/chunk_manager_targeter.h index 2ff268871a4..6de41ed3fe7 100644 --- a/src/mongo/s/commands/chunk_manager_targeter.h +++ b/src/mongo/s/commands/chunk_manager_targeter.h @@ -67,23 +67,20 @@ public: const NamespaceString& getNS() const override; // Returns ShardKeyNotFound if document does not have a full shard key. - Status targetInsert(OperationContext* opCtx, - const BSONObj& doc, - ShardEndpoint** endpoint) const override; + StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx, + const BSONObj& doc) const override; // Returns ShardKeyNotFound if the update can't be targeted without a shard key. - Status targetUpdate(OperationContext* opCtx, - const write_ops::UpdateOpEntry& updateDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override; + StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override; // Returns ShardKeyNotFound if the delete can't be targeted without a shard key. - Status targetDelete(OperationContext* opCtx, - const write_ops::DeleteOpEntry& deleteDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override; + StatusWith<std::vector<ShardEndpoint>> targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const override; - Status targetCollection(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override; + StatusWith<std::vector<ShardEndpoint>> targetCollection() const override; - Status targetAllShards(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override; + StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override; void noteCouldNotTarget() override; @@ -106,7 +103,7 @@ private: /** * Performs an actual refresh from the config server. */ - Status refreshNow(OperationContext* opCtx); + Status _refreshNow(OperationContext* opCtx); /** * Returns a vector of ShardEndpoints where a document might need to be placed. @@ -115,10 +112,9 @@ private: * * If 'collation' is empty, we use the collection default collation for targeting. */ - Status targetDoc(OperationContext* opCtx, - const BSONObj& doc, - const BSONObj& collation, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const; + StatusWith<std::vector<ShardEndpoint>> _targetDoc(OperationContext* opCtx, + const BSONObj& doc, + const BSONObj& collation) const; /** * Returns a vector of ShardEndpoints for a potentially multi-shard query. @@ -127,10 +123,9 @@ private: * * If 'collation' is empty, we use the collection default collation for targeting. */ - Status targetQuery(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& collation, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const; + StatusWith<std::vector<ShardEndpoint>> _targetQuery(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& collation) const; /** * Returns a ShardEndpoint for an exact shard key query. @@ -140,9 +135,9 @@ private: * * If 'collation' is empty, we use the collection default collation for targeting. */ - std::unique_ptr<ShardEndpoint> targetShardKey(const BSONObj& doc, - const BSONObj& collation, - long long estDataSize) const; + ShardEndpoint _targetShardKey(const BSONObj& doc, + const BSONObj& collation, + long long estDataSize) const; // Full namespace of the collection for this targeter const NamespaceString _nss; diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 12d44edc75b..2593fb866fe 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -205,22 +205,23 @@ void ClusterWriter::write(OperationContext* opCtx, return; } - std::vector<std::unique_ptr<ShardEndpoint>> endpoints; - auto targetStatus = targeter.targetCollection(&endpoints); - if (!targetStatus.isOK()) { - toBatchError({targetStatus.code(), + auto swEndpoints = targeter.targetCollection(); + if (!swEndpoints.isOK()) { + toBatchError({swEndpoints.getStatus().code(), str::stream() << "unable to target" << (request.isInsertIndexRequest() ? " index" : "") << " write op for collection " << request.getTargetingNS().ns() - << causedBy(targetStatus)}, + << causedBy(swEndpoints.getStatus())}, response); return; } + const auto& endpoints = swEndpoints.getValue(); + // Handle sharded config server writes differently. if (std::any_of(endpoints.begin(), endpoints.end(), [](const auto& it) { - return it->shardName == ShardRegistry::kConfigServerShardId; + return it.shardName == ShardRegistry::kConfigServerShardId; })) { // There should be no namespaces that partially target config servers. invariant(endpoints.size() == 1); diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 8f7581d402e..8dd4a34fe11 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -271,41 +271,28 @@ private: if (!status.isOK()) return status; - std::vector<std::unique_ptr<ShardEndpoint>> endpoints; - - if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { - ShardEndpoint* endpoint; - Status status = - targeter.targetInsert(opCtx, targetingBatchItem.getDocument(), &endpoint); - if (!status.isOK()) - return status; - endpoints.push_back(std::unique_ptr<ShardEndpoint>{endpoint}); - } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { - Status status = - targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate(), &endpoints); - if (!status.isOK()) - return status; - } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) { - Status status = - targeter.targetDelete(opCtx, targetingBatchItem.getDelete(), &endpoints); - if (!status.isOK()) - return status; - } else { - MONGO_UNREACHABLE; - } + auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> { + if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { + auto swEndpoint = targeter.targetInsert(opCtx, targetingBatchItem.getDocument()); + if (!swEndpoint.isOK()) + return swEndpoint.getStatus(); + return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())}; + } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { + return targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate()); + } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) { + return targeter.targetDelete(opCtx, targetingBatchItem.getDelete()); + } else { + MONGO_UNREACHABLE; + } + }(); - auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!swEndpoints.isOK()) + return swEndpoints.getStatus(); // Assemble requests std::vector<AsyncRequestsSender::Request> requests; - for (auto it = endpoints.begin(); it != endpoints.end(); ++it) { - const ShardEndpoint* endpoint = it->get(); - - auto shardStatus = shardRegistry->getShard(opCtx, endpoint->shardName); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - requests.emplace_back(shardStatus.getValue()->getId(), command); + for (const auto& endpoint : swEndpoints.getValue()) { + requests.emplace_back(endpoint.shardName, command); } // Send the requests. diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index e3a554cd252..f915be42ec4 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -30,7 +30,7 @@ #include <vector> -#include "mongo/base/status.h" +#include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" @@ -94,43 +94,39 @@ public: * * Returns !OK with message if document could not be targeted for other reasons. */ - virtual Status targetInsert(OperationContext* opCtx, - const BSONObj& doc, - ShardEndpoint** endpoint) const = 0; + virtual StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx, + const BSONObj& doc) const = 0; /** * Returns a vector of ShardEndpoints for a potentially multi-shard update. * * Returns OK and fills the endpoints; returns a status describing the error otherwise. */ - virtual Status targetUpdate(OperationContext* opCtx, - const write_ops::UpdateOpEntry& updateDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0; + virtual StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const = 0; /** * Returns a vector of ShardEndpoints for a potentially multi-shard delete. * * Returns OK and fills the endpoints; returns a status describing the error otherwise. */ - virtual Status targetDelete(OperationContext* opCtx, - const write_ops::DeleteOpEntry& deleteDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0; + virtual StatusWith<std::vector<ShardEndpoint>> targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const = 0; /** * Returns a vector of ShardEndpoints for the entire collection. * * Returns !OK with message if the full collection could not be targeted. */ - virtual Status targetCollection( - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0; + virtual StatusWith<std::vector<ShardEndpoint>> targetCollection() const = 0; /** * Returns a vector of ShardEndpoints for all shards. * * Returns !OK with message if all shards could not be targeted. */ - virtual Status targetAllShards( - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const = 0; + virtual StatusWith<std::vector<ShardEndpoint>> targetAllShards( + OperationContext* opCtx) const = 0; /** * Informs the targeter that a targeting failure occurred during one of the last targeting diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index 89ac1e64ce5..b546e71a2af 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -72,49 +72,46 @@ public: /** * Returns a ShardEndpoint for the doc from the mock ranges */ - Status targetInsert(OperationContext* opCtx, - const BSONObj& doc, - ShardEndpoint** endpoint) const override { - std::vector<std::unique_ptr<ShardEndpoint>> endpoints; - Status status = _targetQuery(doc, &endpoints); - if (!status.isOK()) - return status; - if (!endpoints.empty()) - *endpoint = endpoints.front().release(); - return Status::OK(); + StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx, + const BSONObj& doc) const override { + auto swEndpoints = _targetQuery(doc); + if (!swEndpoints.isOK()) + return swEndpoints.getStatus(); + + ASSERT_EQ(1U, swEndpoints.getValue().size()); + return swEndpoints.getValue().front(); } /** * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - Status targetUpdate(OperationContext* opCtx, - const write_ops::UpdateOpEntry& updateDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override { - return _targetQuery(updateDoc.getQ(), endpoints); + StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + return _targetQuery(updateDoc.getQ()); } /** * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - Status targetDelete(OperationContext* opCtx, - const write_ops::DeleteOpEntry& deleteDoc, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - return _targetQuery(deleteDoc.getQ(), endpoints); + StatusWith<std::vector<ShardEndpoint>> targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const { + return _targetQuery(deleteDoc.getQ()); } - Status targetCollection(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override { + StatusWith<std::vector<ShardEndpoint>> targetCollection() const override { // No-op - return Status::OK(); + return std::vector<ShardEndpoint>{}; } - Status targetAllShards(std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const override { + StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override { + std::vector<ShardEndpoint> endpoints; for (const auto& range : _mockRanges) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>(range.endpoint)); + endpoints.push_back(range.endpoint); } - return Status::OK(); + return endpoints; } void noteCouldNotTarget() override { @@ -158,23 +155,24 @@ private: } /** - * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle - * queries of the form { field : { $gte : <value>, $lt : <value> } }. + * Returns the first ShardEndpoint for the query from the mock ranges. Only handles queries of + * the form { field : { $gte : <value>, $lt : <value> } }. */ - Status _targetQuery(const BSONObj& query, - std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const { - ChunkRange queryRange(_parseRange(query)); + StatusWith<std::vector<ShardEndpoint>> _targetQuery(const BSONObj& query) const { + const ChunkRange queryRange(_parseRange(query)); + + std::vector<ShardEndpoint> endpoints; for (const auto& range : _mockRanges) { if (queryRange.overlapWith(range.range)) { - endpoints->push_back(stdx::make_unique<ShardEndpoint>(range.endpoint)); + endpoints.push_back(range.endpoint); } } - if (endpoints->empty()) + if (endpoints.empty()) return {ErrorCodes::UnknownError, "no mock ranges found for query"}; - return Status::OK(); + return endpoints; } NamespaceString _nss; diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 2d77ed4b56b..ad3de6cc2e4 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -55,59 +55,55 @@ Status WriteOp::targetWrites(OperationContext* opCtx, std::vector<TargetedWrite*>* targetedWrites) { const bool isIndexInsert = _itemRef.getRequest()->isInsertIndexRequest(); - Status targetStatus = Status::OK(); - std::vector<std::unique_ptr<ShardEndpoint>> endpoints; - - if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) { - if (isIndexInsert) { - // TODO: Remove the index targeting stuff once there is a command for it? - // TODO: Retry index writes with stale version? - targetStatus = targeter.targetCollection(&endpoints); - } else { - ShardEndpoint* endpoint = nullptr; - targetStatus = targeter.targetInsert(opCtx, _itemRef.getDocument(), &endpoint); - if (targetStatus.isOK()) { - // Store single endpoint result if we targeted a single endpoint - endpoints.push_back(std::unique_ptr<ShardEndpoint>{endpoint}); + auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> { + if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) { + if (isIndexInsert) { + // TODO: Remove the index targeting stuff once there is a command for it? + // TODO: Retry index writes with stale version? + return targeter.targetCollection(); } + + auto swEndpoint = targeter.targetInsert(opCtx, _itemRef.getDocument()); + if (!swEndpoint.isOK()) + return swEndpoint.getStatus(); + + return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())}; + } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) { + return targeter.targetUpdate(opCtx, _itemRef.getUpdate()); + } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) { + return targeter.targetDelete(opCtx, _itemRef.getDelete()); + } else { + MONGO_UNREACHABLE; } - } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) { - targetStatus = targeter.targetUpdate(opCtx, _itemRef.getUpdate(), &endpoints); - } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) { - targetStatus = targeter.targetDelete(opCtx, _itemRef.getDelete(), &endpoints); - } else { - MONGO_UNREACHABLE; - } + }(); // If we're targeting more than one endpoint with an update/delete, we have to target everywhere // since we cannot currently retry partial results. // // NOTE: Index inserts are currently specially targeted only at the current collection to avoid // creating collections everywhere. - if (targetStatus.isOK() && endpoints.size() > 1u && !isIndexInsert) { - endpoints.clear(); - targetStatus = targeter.targetAllShards(&endpoints); + if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !isIndexInsert) { + swEndpoints = targeter.targetAllShards(opCtx); } // If we had an error, stop here - if (!targetStatus.isOK()) - return targetStatus; + if (!swEndpoints.isOK()) + return swEndpoints.getStatus(); - for (auto it = endpoints.begin(); it != endpoints.end(); ++it) { - ShardEndpoint* endpoint = it->get(); + auto& endpoints = swEndpoints.getValue(); + for (auto&& endpoint : endpoints) { _childOps.emplace_back(this); WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1); // For now, multiple endpoints imply no versioning - we can't retry half a multi-write - if (endpoints.size() == 1u) { - targetedWrites->push_back(new TargetedWrite(*endpoint, ref)); - } else { - ShardEndpoint broadcastEndpoint(endpoint->shardName, ChunkVersion::IGNORED()); - targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref)); + if (endpoints.size() > 1u) { + endpoint.shardVersion = ChunkVersion::IGNORED(); } + targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref)); + _childOps.back().pendingWrite = targetedWrites->back(); _childOps.back().state = WriteOpState_Pending; } |