summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2021-08-30 20:55:33 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-31 13:41:49 +0000
commitccac542d74e706bbb5df50be4a97d5f817a2de21 (patch)
tree767c99b4a94855ac2726a5c5f2417820a9841d0f /src/mongo/s
parent61efca02fadc86178a0effd8ad6d60a77b265dca (diff)
downloadmongo-ccac542d74e706bbb5df50be4a97d5f817a2de21.tar.gz
SERVER-59164 All write commands on sharded time-series namespace should translate to buckets namespace, on mongos
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp99
-rw-r--r--src/mongo/s/chunk_manager_targeter.h14
-rw-r--r--src/mongo/s/commands/cluster_coll_stats_cmd.cpp2
-rw-r--r--src/mongo/s/mock_ns_targeter.h4
-rw-r--r--src/mongo/s/ns_targeter.h2
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp4
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp14
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h3
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp3
-rw-r--r--src/mongo/s/write_ops/batched_command_request.cpp8
10 files changed, 86 insertions, 67 deletions
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index 8abd414d714..ab08a49823e 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -235,11 +235,9 @@ bool isMetadataDifferent(const ChunkManager& managerA, const ChunkManager& manag
ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional<OID> targetEpoch)
- : _nss(nss), _targetEpoch(std::move(targetEpoch)) {
- _init(opCtx);
-}
+ : _nss(nss), _targetEpoch(std::move(targetEpoch)), _cm(_init(opCtx)) {}
-void ChunkManagerTargeter::_init(OperationContext* opCtx) {
+ChunkManager ChunkManagerTargeter::_init(OperationContext* opCtx) {
cluster::createDatabase(opCtx, _nss.db());
// Check if we target sharded time-series collection. For such collections we target write
@@ -251,29 +249,24 @@ void ChunkManagerTargeter::_init(OperationContext* opCtx) {
// routing info for the underlying buckets collection. The absense of this routing info means
// that this collection does not exist. Finally, we check if this underlying collection is
// sharded. If all these conditions pass, we are targeting sharded time-series collection.
- bool isShardedTimeseriesCollection = false;
- auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss));
- if (!routingInfo.isSharded()) {
+ auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss));
+ if (!cm.isSharded()) {
auto bucketsNs = _nss.makeTimeseriesBucketsNamespace();
- auto bucketsRoutingInfo = getCollectionRoutingInfoForTxnCmd(opCtx, bucketsNs);
- if (bucketsRoutingInfo.isOK() && bucketsRoutingInfo.getValue().isSharded()) {
+ auto bucketsRoutingInfo =
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, bucketsNs));
+ if (bucketsRoutingInfo.isSharded()) {
_nss = bucketsNs;
- _cm = bucketsRoutingInfo.getValue();
- isShardedTimeseriesCollection = true;
+ cm = std::move(bucketsRoutingInfo);
}
}
- // For all other collections, use the original routing info.
- if (!isShardedTimeseriesCollection) {
- _cm = routingInfo;
- }
-
if (_targetEpoch) {
- uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _cm->isSharded());
+ uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", cm.isSharded());
uassert(ErrorCodes::StaleEpoch,
"Collection epoch has changed",
- _cm->getVersion().epoch() == *_targetEpoch);
+ cm.getVersion().epoch() == *_targetEpoch);
}
+ return cm;
}
const NamespaceString& ChunkManagerTargeter::getNS() const {
@@ -300,9 +293,7 @@ BSONObj ChunkManagerTargeter::extractBucketsShardKeyFromTimeseriesDoc(
BSONObjBuilder minBuilder{
controlBuilder.subobjStart(timeseries::kBucketControlMinFieldName)};
minBuilder.append(timeField, roundedTimeValue);
- minBuilder.done();
}
- controlBuilder.done();
}
if (auto metaField = timeseriesOptions.getMetaField(); metaField) {
@@ -319,10 +310,10 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
const BSONObj& doc) const {
BSONObj shardKey;
- if (_cm->isSharded()) {
- const auto& shardKeyPattern = _cm->getShardKeyPattern();
+ if (_cm.isSharded()) {
+ const auto& shardKeyPattern = _cm.getShardKeyPattern();
if (_nss.isTimeseriesBucketsCollection()) {
- auto tsFields = _cm->getTimeseriesFields();
+ auto tsFields = _cm.getTimeseriesFields();
tassert(5743701, "Missing timeseriesFields on buckets collection", tsFields);
shardKey = extractBucketsShardKeyFromTimeseriesDoc(
doc, shardKeyPattern, tsFields->getTimeseriesOptions());
@@ -347,9 +338,9 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
// TODO (SERVER-51070): Remove the boost::none when the config server can support shardVersion
// in commands
return ShardEndpoint(
- _cm->dbPrimary(),
+ _cm.dbPrimary(),
_nss.isOnInternalDb() ? boost::optional<ChunkVersion>() : ChunkVersion::UNSHARDED(),
- _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm->dbVersion());
+ _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm.dbVersion());
}
std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx,
@@ -371,16 +362,16 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext*
const auto updateType = getUpdateExprType(updateOp);
// If the collection is not sharded, forward the update to the primary shard.
- if (!_cm->isSharded()) {
+ if (!_cm.isSharded()) {
// TODO (SERVER-51070): Remove the boost::none when the config server can support
// shardVersion in commands
return std::vector{ShardEndpoint(
- _cm->dbPrimary(),
+ _cm.dbPrimary(),
_nss.isOnInternalDb() ? boost::optional<ChunkVersion>() : ChunkVersion::UNSHARDED(),
- _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm->dbVersion())};
+ _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm.dbVersion())};
}
- const auto& shardKeyPattern = _cm->getShardKeyPattern();
+ const auto& shardKeyPattern = _cm.getShardKeyPattern();
const auto collation = write_ops::collationOf(updateOp);
auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx,
@@ -435,7 +426,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext*
"single shard (and have the simple collation), but this update targeted "
<< endPoints.size() << " shards. Update request: " << updateOp.toBSON()
<< ", shard key pattern: " << shardKeyPattern.toString(),
- updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, *_cm));
+ updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, _cm));
// If the request is {multi:false}, then this is a single op-style update which we are
// broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics.
@@ -459,12 +450,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext*
itemRef.getLegacyRuntimeConstants());
BSONObj shardKey;
- if (_cm->isSharded()) {
+ if (_cm.isSharded()) {
// Sharded collections have the following further requirements for targeting:
//
// Limit-1 deletes must be targeted exactly by shard key *or* exact _id
shardKey = uassertStatusOK(
- _cm->getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ()));
+ _cm.getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ()));
}
// Target the shard key or delete query
@@ -498,8 +489,8 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext*
"on _id (and have the collection default collation) or contain the "
"shard key (and have the simple collation). Delete request: "
<< deleteOp.toBSON()
- << ", shard key pattern: " << _cm->getShardKeyPattern().toString(),
- !_cm->isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, *_cm));
+ << ", shard key pattern: " << _cm.getShardKeyPattern().toString(),
+ !_cm.isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, _cm));
return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation));
}
@@ -508,25 +499,25 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
boost::intrusive_ptr<ExpressionContext> expCtx,
const BSONObj& query,
const BSONObj& collation) const {
- if (!_cm->isSharded()) {
+ if (!_cm.isSharded()) {
// TODO (SERVER-51070): Remove the boost::none when the config server can support
// shardVersion in commands
return std::vector{ShardEndpoint(
- _cm->dbPrimary(),
+ _cm.dbPrimary(),
_nss.isOnInternalDb() ? boost::optional<ChunkVersion>() : ChunkVersion::UNSHARDED(),
- _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm->dbVersion())};
+ _nss.isOnInternalDb() ? boost::optional<DatabaseVersion>() : _cm.dbVersion())};
}
std::set<ShardId> shardIds;
try {
- _cm->getShardIdsForQuery(expCtx, query, collation, &shardIds);
+ _cm.getShardIdsForQuery(expCtx, query, collation, &shardIds);
} catch (const DBException& ex) {
return ex.toStatus();
}
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId), boost::none);
+ endpoints.emplace_back(std::move(shardId), _cm.getVersion(shardId), boost::none);
}
return endpoints;
@@ -535,8 +526,8 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& shardKey,
const BSONObj& collation) const {
try {
- auto chunk = _cm->findIntersectingChunk(shardKey, collation);
- return ShardEndpoint(chunk.getShardId(), _cm->getVersion(chunk.getShardId()), boost::none);
+ auto chunk = _cm.findIntersectingChunk(shardKey, collation);
+ return ShardEndpoint(chunk.getShardId(), _cm.getVersion(chunk.getShardId()), boost::none);
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -546,13 +537,13 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s
std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const {
// This function is only called if doing a multi write that targets more than one shard. This
// implies the collection is sharded, so we should always have a chunk manager.
- invariant(_cm->isSharded());
+ invariant(_cm.isSharded());
auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload();
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId), boost::none);
+ endpoints.emplace_back(std::move(shardId), _cm.getVersion(shardId), boost::none);
}
return endpoints;
@@ -598,27 +589,35 @@ bool ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx) {
"staleDbVersion"_attr = _lastError.get() == LastErrorType::kStaleDbVersion);
// Get the latest metadata information from the cache if there were issues
- auto lastManager = *_cm;
- _init(opCtx);
- auto metadataChanged = isMetadataDifferent(lastManager, *_cm);
+ auto lastManager = _cm;
+ _cm = _init(opCtx);
+ auto metadataChanged = isMetadataDifferent(lastManager, _cm);
if (_lastError.get() == LastErrorType::kCouldNotTarget && !metadataChanged) {
// If we couldn't target and we dind't already update the metadata we must force a refresh
uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss));
- _init(opCtx);
- metadataChanged = isMetadataDifferent(lastManager, *_cm);
+ _cm = _init(opCtx);
+ metadataChanged = isMetadataDifferent(lastManager, _cm);
}
return metadataChanged;
}
int ChunkManagerTargeter::getNShardsOwningChunks() const {
- if (_cm->isSharded()) {
- return _cm->getNShardsOwningChunks();
+ if (_cm.isSharded()) {
+ return _cm.getNShardsOwningChunks();
}
return 0;
}
+bool ChunkManagerTargeter::isShardedTimeSeriesBucketsNamespace() const {
+ return _cm.isSharded() && _cm.getTimeseriesFields();
+}
+
+const ChunkManager& ChunkManagerTargeter::getRoutingInfo() const {
+ return _cm;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h
index 490b0ac1400..254b975a017 100644
--- a/src/mongo/s/chunk_manager_targeter.h
+++ b/src/mongo/s/chunk_manager_targeter.h
@@ -110,9 +110,9 @@ public:
int getNShardsOwningChunks() const override;
- const ChunkManager& getRoutingInfo() const {
- return *_cm;
- }
+ bool isShardedTimeSeriesBucketsNamespace() const override;
+
+ const ChunkManager& getRoutingInfo() const;
static BSONObj extractBucketsShardKeyFromTimeseriesDoc(
const BSONObj& doc,
@@ -120,7 +120,7 @@ public:
const TimeseriesOptions& timeseriesOptions);
private:
- void _init(OperationContext* opCtx);
+ ChunkManager _init(OperationContext* opCtx);
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard query.
@@ -151,12 +151,12 @@ private:
// Stores last error occurred
boost::optional<LastErrorType> _lastError;
- // The latest loaded routing cache entry
- boost::optional<ChunkManager> _cm;
-
// Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and
// find a new epoch, we immediately throw a StaleEpoch exception.
boost::optional<OID> _targetEpoch;
+
+ // The latest loaded routing cache entry
+ ChunkManager _cm;
};
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp
index ac911cd8ceb..81b69c66dd6 100644
--- a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp
+++ b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp
@@ -215,7 +215,7 @@ public:
BSONObj cmdObjToSend = cmdObj.removeField("scale");
// Translate command collection namespace for time-series collection.
- if (cm.isSharded() && cm.getTimeseriesFields()) {
+ if (cm.isSharded() && cm.getTimeseriesFields() && !nss.isTimeseriesBucketsCollection()) {
cmdObjToSend = timeseries::makeTimeseriesCommand(cmdObjToSend, nss, getName());
}
diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h
index 8ef5687b569..4c3e2d796d8 100644
--- a/src/mongo/s/mock_ns_targeter.h
+++ b/src/mongo/s/mock_ns_targeter.h
@@ -123,6 +123,10 @@ public:
return 0;
}
+ bool isShardedTimeSeriesBucketsNamespace() const override {
+ return false;
+ }
+
private:
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only handles queries of
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 9fd233b1fa6..4db2397d826 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -70,6 +70,8 @@ public:
*/
virtual const NamespaceString& getNS() const = 0;
+ virtual bool isShardedTimeSeriesBucketsNamespace() const = 0;
+
/**
* Returns a ShardEndpoint for a single document write or throws ShardKeyNotFound if 'doc' is
* malformed with respect to the shard key pattern of the collection.
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 7dec6243df5..4479afd6875 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -126,7 +126,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse,
BatchWriteExecStats* stats) {
- const auto& nss(clientRequest.getNS());
+ const auto& nss(targeter.getNS());
LOGV2_DEBUG(22904,
4,
@@ -231,7 +231,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
stats->noteTargetedShard(targetShardId);
const auto request = [&] {
- const auto shardBatchRequest(batchOp.buildBatchRequest(*nextBatch));
+ const auto shardBatchRequest(batchOp.buildBatchRequest(*nextBatch, targeter));
BSONObjBuilder requestBuilder;
shardBatchRequest.serialize(&requestBuilder);
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 8b0030e8006..20e183e5ca6 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -462,8 +462,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
return Status::OK();
}
-BatchedCommandRequest BatchWriteOp::buildBatchRequest(
- const TargetedWriteBatch& targetedBatch) const {
+BatchedCommandRequest BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& targetedBatch,
+ const NSTargeter& targeter) const {
const auto batchType = _clientRequest.getBatchType();
boost::optional<std::vector<int32_t>> stmtIdsForOp;
@@ -511,13 +511,13 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
switch (batchType) {
case BatchedCommandRequest::BatchType_Insert:
return BatchedCommandRequest([&] {
- write_ops::InsertCommandRequest insertOp(_clientRequest.getNS());
+ write_ops::InsertCommandRequest insertOp(targeter.getNS());
insertOp.setDocuments(std::move(*insertDocs));
return insertOp;
}());
case BatchedCommandRequest::BatchType_Update: {
return BatchedCommandRequest([&] {
- write_ops::UpdateCommandRequest updateOp(_clientRequest.getNS());
+ write_ops::UpdateCommandRequest updateOp(targeter.getNS());
updateOp.setUpdates(std::move(*updates));
// Each child batch inherits its let params/runtime constants from the parent
// batch.
@@ -528,7 +528,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
}
case BatchedCommandRequest::BatchType_Delete:
return BatchedCommandRequest([&] {
- write_ops::DeleteCommandRequest deleteOp(_clientRequest.getNS());
+ write_ops::DeleteCommandRequest deleteOp(targeter.getNS());
deleteOp.setDeletes(std::move(*deletes));
// Each child batch inherits its let params from the parent batch.
deleteOp.setLet(_clientRequest.getLet());
@@ -546,6 +546,10 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
_clientRequest.getWriteCommandRequestBase().getBypassDocumentValidation());
wcb.setOrdered(_clientRequest.getWriteCommandRequestBase().getOrdered());
+ if (targeter.isShardedTimeSeriesBucketsNamespace()) {
+ wcb.setIsTimeseriesNamespace(true);
+ }
+
if (_batchTxnNum) {
wcb.setStmtIds(std::move(stmtIdsForOp));
}
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 718e6de75e7..73a9f8175a5 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -150,7 +150,8 @@ public:
/**
* Fills a BatchCommandRequest from a TargetedWriteBatch for this BatchWriteOp.
*/
- BatchedCommandRequest buildBatchRequest(const TargetedWriteBatch& targetedBatch) const;
+ BatchedCommandRequest buildBatchRequest(const TargetedWriteBatch& targetedBatch,
+ const NSTargeter& targeter) const;
/**
* Stores a response from one of the outstanding TargetedWriteBatches for this BatchWriteOp.
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index 224aa009b69..43fc32f13b2 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -248,7 +248,8 @@ TEST_F(BatchWriteOpTest, SingleWriteConcernErrorOrdered) {
ASSERT_EQUALS(targeted.size(), 1u);
assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
- BatchedCommandRequest targetBatch = batchOp.buildBatchRequest(*targeted.begin()->second);
+ BatchedCommandRequest targetBatch =
+ batchOp.buildBatchRequest(*targeted.begin()->second, targeter);
ASSERT(targetBatch.getWriteConcern().woCompare(request.getWriteConcern()) == 0);
BatchedCommandResponse response;
diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp
index abc43cf5009..1e262dc4ce9 100644
--- a/src/mongo/s/write_ops/batched_command_request.cpp
+++ b/src/mongo/s/write_ops/batched_command_request.cpp
@@ -57,6 +57,14 @@ BatchedCommandRequest constructBatchedCommandRequest(const OpMsgRequest& request
batchRequest.setWriteConcern(writeConcernField.Obj());
}
+ // The 'isTimeseriesNamespace' is an internal parameter used for communication between mongos
+ // and mongod.
+ auto isTimeseriesNamespace =
+ request.body[write_ops::WriteCommandRequestBase::kIsTimeseriesNamespaceFieldName];
+ uassert(5916401,
+ "the 'isTimeseriesNamespace' parameter cannot be used on mongos",
+ !isTimeseriesNamespace.trueValue());
+
return batchRequest;
}