summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/collmod_coordinator.cpp
diff options
context:
space:
mode:
authorRui Liu <rui.liu@mongodb.com>2022-03-23 16:21:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-23 17:15:06 +0000
commit1be55774474f82e55a2ff83883428559358fe89c (patch)
treede03e427f5d1eaa40f9ec427cf96bc06c221def0 /src/mongo/db/s/collmod_coordinator.cpp
parentfc2b624564c1ec4f7d27c98e5a0074920e7ee5a1 (diff)
downloadmongo-1be55774474f82e55a2ff83883428559358fe89c.tar.gz
SERVER-61028 Implement granularity update for timeseries collection
Diffstat (limited to 'src/mongo/db/s/collmod_coordinator.cpp')
-rw-r--r--src/mongo/db/s/collmod_coordinator.cpp196
1 files changed, 154 insertions, 42 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp
index 7b3940bd41e..6c7c4ce5015 100644
--- a/src/mongo/db/s/collmod_coordinator.cpp
+++ b/src/mongo/db/s/collmod_coordinator.cpp
@@ -37,19 +37,24 @@
#include "mongo/db/coll_mod_gen.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/ops/insert.h"
+#include "mongo/db/s/participant_block_gen.h"
#include "mongo/db/s/sharded_collmod_gen.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/timeseries/catalog_helper.h"
#include "mongo/db/timeseries/timeseries_collmod.h"
+#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(collModBeforeConfigServerUpdate);
+
namespace {
bool isShardedColl(OperationContext* opCtx, const NamespaceString& nss) {
@@ -66,6 +71,26 @@ bool hasTimeSeriesGranularityUpdate(const CollModRequest& request) {
return request.getTimeseries() && request.getTimeseries()->getGranularity();
}
+CollModCollectionInfo getCollModCollectionInfo(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ CollModCollectionInfo info;
+ info.setTimeSeriesOptions(timeseries::getTimeseriesOptions(opCtx, nss, true));
+ info.setNsForTargetting(info.getTimeSeriesOptions() ? nss.makeTimeseriesBucketsNamespace()
+ : nss);
+ info.setIsSharded(isShardedColl(opCtx, info.getNsForTargetting()));
+ if (info.getIsSharded()) {
+ const auto chunkManager =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, info.getNsForTargetting()));
+ info.setPrimaryShard(chunkManager.dbPrimary());
+ std::set<ShardId> shardIdsSet;
+ chunkManager.getAllShardIds(&shardIdsSet);
+ std::vector<ShardId> shardIdsVec{shardIdsSet.begin(), shardIdsSet.end()};
+ info.setShardsOwningChunks(shardIdsVec);
+ }
+ return info;
+}
+
} // namespace
CollModCoordinator::CollModCoordinator(ShardingDDLCoordinatorService* service,
@@ -145,13 +170,24 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
+ .then([this, executor = executor, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ if (_doc.getPhase() > Phase::kUnset) {
+ _performNoopRetryableWriteOnParticipants(opCtx, **executor);
+ }
+ })
.then(_executePhase(
- Phase::kUpdateShards,
+ Phase::kBlockShards,
[this, executor = executor, anchor = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ _doc = _updateSession(opCtx, _doc);
+
{
AutoGetCollection coll{
opCtx, nss(), MODE_IS, AutoGetCollectionViewMode::kViewsPermitted};
@@ -159,54 +195,130 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
opCtx, nss(), *coll, _doc.getCollModRequest().getCollectionUUID());
}
- const auto isTimeSeries = timeseries::getTimeseriesOptions(
- opCtx, nss(), !nss().isTimeseriesBucketsCollection());
- const auto collNss = isTimeSeries && !nss().isTimeseriesBucketsCollection()
- ? nss().makeTimeseriesBucketsNamespace()
- : nss();
- const auto isSharded = isShardedColl(opCtx, collNss);
-
- if (isSharded) {
- // Updating granularity on sharded time-series collections is not allowed.
- if (isTimeSeries) {
- uassert(
- ErrorCodes::NotImplemented,
- str::stream()
- << "Cannot update granularity of a sharded time-series collection.",
- !hasTimeSeriesGranularityUpdate(_doc.getCollModRequest()));
- }
- _doc.setCollUUID(
- sharding_ddl_util::getCollectionUUID(opCtx, nss(), true /* allowViews */));
+ if (!_doc.getInfo()) {
+ _doc.setInfo(getCollModCollectionInfo(opCtx, nss()));
+ }
- sharding_ddl_util::stopMigrations(opCtx, nss(), _doc.getCollUUID());
- if (!_firstExecution) {
- _performNoopRetryableWriteOnParticipants(opCtx, **executor);
+ auto isGranularityUpdate = hasTimeSeriesGranularityUpdate(_doc.getCollModRequest());
+ uassert(6201808,
+ "Cannot use time-series options for a non-timeseries collection",
+ _doc.getInfo()->getTimeSeriesOptions() || !isGranularityUpdate);
+ if (isGranularityUpdate) {
+ uassert(ErrorCodes::InvalidOptions,
+ "Invalid transition for timeseries.granularity. Can only transition "
+ "from 'seconds' to 'minutes' or 'minutes' to 'hours'.",
+ timeseries::isValidTimeseriesGranularityTransition(
+ _doc.getInfo()->getTimeSeriesOptions()->getGranularity(),
+ *_doc.getCollModRequest().getTimeseries()->getGranularity()));
+
+ if (_doc.getInfo()->getIsSharded()) {
+ tassert(6201805,
+ "shardsOwningChunks should be set on state document for sharded "
+ "collection",
+ _doc.getInfo()->getShardsOwningChunks());
+
+ _doc.setCollUUID(sharding_ddl_util::getCollectionUUID(
+ opCtx, nss(), true /* allowViews */));
+ sharding_ddl_util::stopMigrations(opCtx, nss(), _doc.getCollUUID());
+
+ ShardsvrParticipantBlock blockCRUDOperationsRequest(
+ _doc.getInfo()->getNsForTargetting());
+ const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(
+ blockCRUDOperationsRequest.toBSON({}));
+ sharding_ddl_util::sendAuthenticatedCommandToShards(
+ opCtx,
+ nss().db(),
+ cmdObj,
+ *_doc.getInfo()->getShardsOwningChunks(),
+ **executor);
}
+ }
+ }))
+ .then(_executePhase(
+ Phase::kUpdateConfig,
+ [this, executor = executor, anchor = shared_from_this()] {
+ collModBeforeConfigServerUpdate.pauseWhileSet();
- _doc = _updateSession(opCtx, _doc);
- const OperationSessionInfo osi = getCurrentSession(_doc);
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
- const auto chunkManager = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(
- opCtx, collNss));
- std::unique_ptr<CollatorInterface> collator;
- const auto expCtx =
- make_intrusive<ExpressionContext>(opCtx, std::move(collator), collNss);
- std::set<ShardId> participants;
- chunkManager.getShardIdsForQuery(
- expCtx, {} /* query */, {} /* collation */, &participants);
+ _doc = _updateSession(opCtx, _doc);
+ tassert(6201803, "collMod collection information should be set", _doc.getInfo());
- ShardsvrCollModParticipant request(nss(), _doc.getCollModRequest());
+ if (_doc.getInfo()->getIsSharded() && _doc.getInfo()->getTimeSeriesOptions() &&
+ hasTimeSeriesGranularityUpdate(_doc.getCollModRequest())) {
+ ConfigsvrCollMod request(_doc.getInfo()->getNsForTargetting(),
+ _doc.getCollModRequest());
const auto cmdObj =
CommandHelpers::appendMajorityWriteConcern(request.toBSON({}));
- const auto& responses = sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- nss().db(),
- cmdObj.addFields(osi.toBSON()),
- {std::make_move_iterator(participants.begin()),
- std::make_move_iterator(participants.end())},
- **executor);
+
+ const auto& configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(
+ configShard->runCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ nss().db().toString(),
+ cmdObj,
+ Shard::RetryPolicy::kIdempotent)));
+ }
+ }))
+ .then(_executePhase(
+ Phase::kUpdateShards,
+ [this, executor = executor, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ _doc = _updateSession(opCtx, _doc);
+ tassert(6201804, "collMod collection information should be set", _doc.getInfo());
+
+ if (_doc.getInfo()->getIsSharded()) {
+ tassert(
+ 6201806,
+ "shardsOwningChunks should be set on state document for sharded collection",
+ _doc.getInfo()->getShardsOwningChunks());
+ tassert(6201807,
+ "primaryShard should be set on state document for sharded collection",
+ _doc.getInfo()->getPrimaryShard());
+
+ ShardsvrCollModParticipant request(nss(), _doc.getCollModRequest());
+ bool needsUnblock = _doc.getInfo()->getTimeSeriesOptions() &&
+ hasTimeSeriesGranularityUpdate(_doc.getCollModRequest());
+ request.setNeedsUnblock(needsUnblock);
+
+ std::vector<AsyncRequestsSender::Response> responses;
+ auto shardsOwningChunks = *_doc.getInfo()->getShardsOwningChunks();
+ auto primaryShardOwningChunk = std::find(shardsOwningChunks.begin(),
+ shardsOwningChunks.end(),
+ _doc.getInfo()->getPrimaryShard());
+ // A view definition will only be present on the primary shard. So we pass an
+ // addition 'performViewChange' flag only to the primary shard.
+ if (primaryShardOwningChunk != shardsOwningChunks.end()) {
+ request.setPerformViewChange(true);
+ const auto& primaryResponse =
+ sharding_ddl_util::sendAuthenticatedCommandToShards(
+ opCtx,
+ nss().db(),
+ CommandHelpers::appendMajorityWriteConcern(request.toBSON({})),
+ {*_doc.getInfo()->getPrimaryShard()},
+ **executor);
+ responses.insert(
+ responses.end(), primaryResponse.begin(), primaryResponse.end());
+ shardsOwningChunks.erase(primaryShardOwningChunk);
+ }
+
+ request.setPerformViewChange(false);
+ const auto& secondaryResponses =
+ sharding_ddl_util::sendAuthenticatedCommandToShards(
+ opCtx,
+ nss().db(),
+ CommandHelpers::appendMajorityWriteConcern(request.toBSON({})),
+ shardsOwningChunks,
+ **executor);
+ responses.insert(
+ responses.end(), secondaryResponses.begin(), secondaryResponses.end());
+
BSONObjBuilder builder;
std::string errmsg;
auto ok = appendRawResponses(opCtx, &errmsg, &builder, responses).responseOK;
@@ -220,7 +332,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
cmd.setCollModRequest(_doc.getCollModRequest());
BSONObjBuilder collModResBuilder;
uassertStatusOK(timeseries::processCollModCommandWithTimeSeriesTranslation(
- opCtx, nss(), cmd, &collModResBuilder));
+ opCtx, nss(), cmd, true, &collModResBuilder));
auto collModRes = collModResBuilder.obj();
const auto dbInfo = uassertStatusOK(