diff options
author | Arun Banala <arun.banala@mongodb.com> | 2021-08-30 20:55:33 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-08-31 13:41:49 +0000 |
commit | ccac542d74e706bbb5df50be4a97d5f817a2de21 (patch) | |
tree | 767c99b4a94855ac2726a5c5f2417820a9841d0f /src/mongo/s | |
parent | 61efca02fadc86178a0effd8ad6d60a77b265dca (diff) | |
download | mongo-ccac542d74e706bbb5df50be4a97d5f817a2de21.tar.gz |
SERVER-59164 All write commands on sharded time-series namespace should translate to buckets namespace, on mongos
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.cpp | 99 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.h | 14 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_coll_stats_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/mock_ns_targeter.h | 4 | ||||
-rw-r--r-- | src/mongo/s/ns_targeter.h | 2 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 3 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_request.cpp | 8 |
10 files changed, 86 insertions, 67 deletions
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 8abd414d714..ab08a49823e 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -235,11 +235,9 @@ bool isMetadataDifferent(const ChunkManager& managerA, const ChunkManager& manag ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, const NamespaceString& nss, boost::optional<OID> targetEpoch) - : _nss(nss), _targetEpoch(std::move(targetEpoch)) { - _init(opCtx); -} + : _nss(nss), _targetEpoch(std::move(targetEpoch)), _cm(_init(opCtx)) {} -void ChunkManagerTargeter::_init(OperationContext* opCtx) { +ChunkManager ChunkManagerTargeter::_init(OperationContext* opCtx) { cluster::createDatabase(opCtx, _nss.db()); // Check if we target sharded time-series collection. For such collections we target write @@ -251,29 +249,24 @@ void ChunkManagerTargeter::_init(OperationContext* opCtx) { // routing info for the underlying buckets collection. The absense of this routing info means // that this collection does not exist. Finally, we check if this underlying collection is // sharded. If all these conditions pass, we are targeting sharded time-series collection. - bool isShardedTimeseriesCollection = false; - auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); - if (!routingInfo.isSharded()) { + auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); + if (!cm.isSharded()) { auto bucketsNs = _nss.makeTimeseriesBucketsNamespace(); - auto bucketsRoutingInfo = getCollectionRoutingInfoForTxnCmd(opCtx, bucketsNs); - if (bucketsRoutingInfo.isOK() && bucketsRoutingInfo.getValue().isSharded()) { + auto bucketsRoutingInfo = + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, bucketsNs)); + if (bucketsRoutingInfo.isSharded()) { _nss = bucketsNs; - _cm = bucketsRoutingInfo.getValue(); - isShardedTimeseriesCollection = true; + cm = std::move(bucketsRoutingInfo); } } - // For all other collections, use the original routing info. - if (!isShardedTimeseriesCollection) { - _cm = routingInfo; - } - if (_targetEpoch) { - uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _cm->isSharded()); + uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", cm.isSharded()); uassert(ErrorCodes::StaleEpoch, "Collection epoch has changed", - _cm->getVersion().epoch() == *_targetEpoch); + cm.getVersion().epoch() == *_targetEpoch); } + return cm; } const NamespaceString& ChunkManagerTargeter::getNS() const { @@ -300,9 +293,7 @@ BSONObj ChunkManagerTargeter::extractBucketsShardKeyFromTimeseriesDoc( BSONObjBuilder minBuilder{ controlBuilder.subobjStart(timeseries::kBucketControlMinFieldName)}; minBuilder.append(timeField, roundedTimeValue); - minBuilder.done(); } - controlBuilder.done(); } if (auto metaField = timeseriesOptions.getMetaField(); metaField) { @@ -319,10 +310,10 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, const BSONObj& doc) const { BSONObj shardKey; - if (_cm->isSharded()) { - const auto& shardKeyPattern = _cm->getShardKeyPattern(); + if (_cm.isSharded()) { + const auto& shardKeyPattern = _cm.getShardKeyPattern(); if (_nss.isTimeseriesBucketsCollection()) { - auto tsFields = _cm->getTimeseriesFields(); + auto tsFields = _cm.getTimeseriesFields(); tassert(5743701, "Missing timeseriesFields on buckets collection", tsFields); shardKey = extractBucketsShardKeyFromTimeseriesDoc( doc, shardKeyPattern, tsFields->getTimeseriesOptions()); @@ -347,9 +338,9 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, // TODO (SERVER-51070): Remove the boost::none when the config server can support shardVersion // in commands return ShardEndpoint( - _cm->dbPrimary(), + _cm.dbPrimary(), _nss.isOnInternalDb() ? boost::optional<ChunkVersion>() : ChunkVersion::UNSHARDED(), - _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm->dbVersion()); + _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm.dbVersion()); } std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx, @@ -371,16 +362,16 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. - if (!_cm->isSharded()) { + if (!_cm.isSharded()) { // TODO (SERVER-51070): Remove the boost::none when the config server can support // shardVersion in commands return std::vector{ShardEndpoint( - _cm->dbPrimary(), + _cm.dbPrimary(), _nss.isOnInternalDb() ? boost::optional<ChunkVersion>() : ChunkVersion::UNSHARDED(), - _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm->dbVersion())}; + _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm.dbVersion())}; } - const auto& shardKeyPattern = _cm->getShardKeyPattern(); + const auto& shardKeyPattern = _cm.getShardKeyPattern(); const auto collation = write_ops::collationOf(updateOp); auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx, @@ -435,7 +426,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* "single shard (and have the simple collation), but this update targeted " << endPoints.size() << " shards. Update request: " << updateOp.toBSON() << ", shard key pattern: " << shardKeyPattern.toString(), - updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, *_cm)); + updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, _cm)); // If the request is {multi:false}, then this is a single op-style update which we are // broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics. @@ -459,12 +450,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* itemRef.getLegacyRuntimeConstants()); BSONObj shardKey; - if (_cm->isSharded()) { + if (_cm.isSharded()) { // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id shardKey = uassertStatusOK( - _cm->getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ())); + _cm.getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ())); } // Target the shard key or delete query @@ -498,8 +489,8 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* "on _id (and have the collection default collation) or contain the " "shard key (and have the simple collation). Delete request: " << deleteOp.toBSON() - << ", shard key pattern: " << _cm->getShardKeyPattern().toString(), - !_cm->isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, *_cm)); + << ", shard key pattern: " << _cm.getShardKeyPattern().toString(), + !_cm.isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, _cm)); return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation)); } @@ -508,25 +499,25 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( boost::intrusive_ptr<ExpressionContext> expCtx, const BSONObj& query, const BSONObj& collation) const { - if (!_cm->isSharded()) { + if (!_cm.isSharded()) { // TODO (SERVER-51070): Remove the boost::none when the config server can support // shardVersion in commands return std::vector{ShardEndpoint( - _cm->dbPrimary(), + _cm.dbPrimary(), _nss.isOnInternalDb() ? boost::optional<ChunkVersion>() : ChunkVersion::UNSHARDED(), - _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm->dbVersion())}; + _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm.dbVersion())}; } std::set<ShardId> shardIds; try { - _cm->getShardIdsForQuery(expCtx, query, collation, &shardIds); + _cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId), boost::none); + endpoints.emplace_back(std::move(shardId), _cm.getVersion(shardId), boost::none); } return endpoints; @@ -535,8 +526,8 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& shardKey, const BSONObj& collation) const { try { - auto chunk = _cm->findIntersectingChunk(shardKey, collation); - return ShardEndpoint(chunk.getShardId(), _cm->getVersion(chunk.getShardId()), boost::none); + auto chunk = _cm.findIntersectingChunk(shardKey, collation); + return ShardEndpoint(chunk.getShardId(), _cm.getVersion(chunk.getShardId()), boost::none); } catch (const DBException& ex) { return ex.toStatus(); } @@ -546,13 +537,13 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const { // This function is only called if doing a multi write that targets more than one shard. This // implies the collection is sharded, so we should always have a chunk manager. - invariant(_cm->isSharded()); + invariant(_cm.isSharded()); auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(); std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId), boost::none); + endpoints.emplace_back(std::move(shardId), _cm.getVersion(shardId), boost::none); } return endpoints; @@ -598,27 +589,35 @@ bool ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx) { "staleDbVersion"_attr = _lastError.get() == LastErrorType::kStaleDbVersion); // Get the latest metadata information from the cache if there were issues - auto lastManager = *_cm; - _init(opCtx); - auto metadataChanged = isMetadataDifferent(lastManager, *_cm); + auto lastManager = _cm; + _cm = _init(opCtx); + auto metadataChanged = isMetadataDifferent(lastManager, _cm); if (_lastError.get() == LastErrorType::kCouldNotTarget && !metadataChanged) { // If we couldn't target and we dind't already update the metadata we must force a refresh uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss)); - _init(opCtx); - metadataChanged = isMetadataDifferent(lastManager, *_cm); + _cm = _init(opCtx); + metadataChanged = isMetadataDifferent(lastManager, _cm); } return metadataChanged; } int ChunkManagerTargeter::getNShardsOwningChunks() const { - if (_cm->isSharded()) { - return _cm->getNShardsOwningChunks(); + if (_cm.isSharded()) { + return _cm.getNShardsOwningChunks(); } return 0; } +bool ChunkManagerTargeter::isShardedTimeSeriesBucketsNamespace() const { + return _cm.isSharded() && _cm.getTimeseriesFields(); +} + +const ChunkManager& ChunkManagerTargeter::getRoutingInfo() const { + return _cm; +} + } // namespace mongo diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index 490b0ac1400..254b975a017 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -110,9 +110,9 @@ public: int getNShardsOwningChunks() const override; - const ChunkManager& getRoutingInfo() const { - return *_cm; - } + bool isShardedTimeSeriesBucketsNamespace() const override; + + const ChunkManager& getRoutingInfo() const; static BSONObj extractBucketsShardKeyFromTimeseriesDoc( const BSONObj& doc, @@ -120,7 +120,7 @@ public: const TimeseriesOptions& timeseriesOptions); private: - void _init(OperationContext* opCtx); + ChunkManager _init(OperationContext* opCtx); /** * Returns a vector of ShardEndpoints for a potentially multi-shard query. @@ -151,12 +151,12 @@ private: // Stores last error occurred boost::optional<LastErrorType> _lastError; - // The latest loaded routing cache entry - boost::optional<ChunkManager> _cm; - // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and // find a new epoch, we immediately throw a StaleEpoch exception. boost::optional<OID> _targetEpoch; + + // The latest loaded routing cache entry + ChunkManager _cm; }; } // namespace mongo diff --git a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp index ac911cd8ceb..81b69c66dd6 100644 --- a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp @@ -215,7 +215,7 @@ public: BSONObj cmdObjToSend = cmdObj.removeField("scale"); // Translate command collection namespace for time-series collection. - if (cm.isSharded() && cm.getTimeseriesFields()) { + if (cm.isSharded() && cm.getTimeseriesFields() && !nss.isTimeseriesBucketsCollection()) { cmdObjToSend = timeseries::makeTimeseriesCommand(cmdObjToSend, nss, getName()); } diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index 8ef5687b569..4c3e2d796d8 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -123,6 +123,10 @@ public: return 0; } + bool isShardedTimeSeriesBucketsNamespace() const override { + return false; + } + private: /** * Returns the first ShardEndpoint for the query from the mock ranges. Only handles queries of diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 9fd233b1fa6..4db2397d826 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -70,6 +70,8 @@ public: */ virtual const NamespaceString& getNS() const = 0; + virtual bool isShardedTimeSeriesBucketsNamespace() const = 0; + /** * Returns a ShardEndpoint for a single document write or throws ShardKeyNotFound if 'doc' is * malformed with respect to the shard key pattern of the collection. diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 7dec6243df5..4479afd6875 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -126,7 +126,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse, BatchWriteExecStats* stats) { - const auto& nss(clientRequest.getNS()); + const auto& nss(targeter.getNS()); LOGV2_DEBUG(22904, 4, @@ -231,7 +231,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, stats->noteTargetedShard(targetShardId); const auto request = [&] { - const auto shardBatchRequest(batchOp.buildBatchRequest(*nextBatch)); + const auto shardBatchRequest(batchOp.buildBatchRequest(*nextBatch, targeter)); BSONObjBuilder requestBuilder; shardBatchRequest.serialize(&requestBuilder); diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 8b0030e8006..20e183e5ca6 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -462,8 +462,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, return Status::OK(); } -BatchedCommandRequest BatchWriteOp::buildBatchRequest( - const TargetedWriteBatch& targetedBatch) const { +BatchedCommandRequest BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& targetedBatch, + const NSTargeter& targeter) const { const auto batchType = _clientRequest.getBatchType(); boost::optional<std::vector<int32_t>> stmtIdsForOp; @@ -511,13 +511,13 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( switch (batchType) { case BatchedCommandRequest::BatchType_Insert: return BatchedCommandRequest([&] { - write_ops::InsertCommandRequest insertOp(_clientRequest.getNS()); + write_ops::InsertCommandRequest insertOp(targeter.getNS()); insertOp.setDocuments(std::move(*insertDocs)); return insertOp; }()); case BatchedCommandRequest::BatchType_Update: { return BatchedCommandRequest([&] { - write_ops::UpdateCommandRequest updateOp(_clientRequest.getNS()); + write_ops::UpdateCommandRequest updateOp(targeter.getNS()); updateOp.setUpdates(std::move(*updates)); // Each child batch inherits its let params/runtime constants from the parent // batch. @@ -528,7 +528,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( } case BatchedCommandRequest::BatchType_Delete: return BatchedCommandRequest([&] { - write_ops::DeleteCommandRequest deleteOp(_clientRequest.getNS()); + write_ops::DeleteCommandRequest deleteOp(targeter.getNS()); deleteOp.setDeletes(std::move(*deletes)); // Each child batch inherits its let params from the parent batch. deleteOp.setLet(_clientRequest.getLet()); @@ -546,6 +546,10 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( _clientRequest.getWriteCommandRequestBase().getBypassDocumentValidation()); wcb.setOrdered(_clientRequest.getWriteCommandRequestBase().getOrdered()); + if (targeter.isShardedTimeSeriesBucketsNamespace()) { + wcb.setIsTimeseriesNamespace(true); + } + if (_batchTxnNum) { wcb.setStmtIds(std::move(stmtIdsForOp)); } diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 718e6de75e7..73a9f8175a5 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -150,7 +150,8 @@ public: /** * Fills a BatchCommandRequest from a TargetedWriteBatch for this BatchWriteOp. */ - BatchedCommandRequest buildBatchRequest(const TargetedWriteBatch& targetedBatch) const; + BatchedCommandRequest buildBatchRequest(const TargetedWriteBatch& targetedBatch, + const NSTargeter& targeter) const; /** * Stores a response from one of the outstanding TargetedWriteBatches for this BatchWriteOp. diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index 224aa009b69..43fc32f13b2 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -248,7 +248,8 @@ TEST_F(BatchWriteOpTest, SingleWriteConcernErrorOrdered) { ASSERT_EQUALS(targeted.size(), 1u); assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); - BatchedCommandRequest targetBatch = batchOp.buildBatchRequest(*targeted.begin()->second); + BatchedCommandRequest targetBatch = + batchOp.buildBatchRequest(*targeted.begin()->second, targeter); ASSERT(targetBatch.getWriteConcern().woCompare(request.getWriteConcern()) == 0); BatchedCommandResponse response; diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp index abc43cf5009..1e262dc4ce9 100644 --- a/src/mongo/s/write_ops/batched_command_request.cpp +++ b/src/mongo/s/write_ops/batched_command_request.cpp @@ -57,6 +57,14 @@ BatchedCommandRequest constructBatchedCommandRequest(const OpMsgRequest& request batchRequest.setWriteConcern(writeConcernField.Obj()); } + // The 'isTimeseriesNamespace' is an internal parameter used for communication between mongos + // and mongod. + auto isTimeseriesNamespace = + request.body[write_ops::WriteCommandRequestBase::kIsTimeseriesNamespaceFieldName]; + uassert(5916401, + "the 'isTimeseriesNamespace' parameter cannot be used on mongos", + !isTimeseriesNamespace.trueValue()); + return batchRequest; } |