diff options
Diffstat (limited to 'src/mongo/db/s/config_server_op_observer.cpp')
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.cpp | 107 |
1 files changed, 34 insertions, 73 deletions
diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp index fe73090a366..477f304d3a8 100644 --- a/src/mongo/db/s/config_server_op_observer.cpp +++ b/src/mongo/db/s/config_server_op_observer.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/topology_time_ticker.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" @@ -43,16 +44,6 @@ namespace mongo { -namespace { - -// The number of pending topologyTime tick points (stored in the _topologyTimeTickPoints vector) is -// generally expected to be small (since shard topology operations should be infrequent, relative to -// any config server replication lag). If the size of this vector exceeds this constant (when a -// tick point is added), then a warning (with id 4740600) will be logged. -constexpr size_t kPossiblyExcessiveNumTopologyTimeTickPoints = 3; - -} // namespace - ConfigServerOpObserver::ConfigServerOpObserver() = default; ConfigServerOpObserver::~ConfigServerOpObserver() = default; @@ -102,6 +93,13 @@ void ConfigServerOpObserver::_onReplicationRollback(OperationContext* opCtx, ShardingCatalogManager::get(opCtx)->discardCachedConfigDatabaseInitializationState(); ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); } + + if (rbInfo.rollbackNamespaces.find(ShardType::ConfigNS) != rbInfo.rollbackNamespaces.end()) { + // If some entries were rollbacked from config.shards we might need to discard some tick + // points from the TopologyTimeTicker + const auto lastApplied = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); + TopologyTimeTicker::get(opCtx).onReplicationRollback(lastApplied); + } } void ConfigServerOpObserver::onInserts(OperationContext* opCtx, @@ -114,21 +112,25 @@ void ConfigServerOpObserver::onInserts(OperationContext* opCtx, return; } - boost::optional<Timestamp> maxTopologyTime; - for (auto it = begin; it != end; it++) { - Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp(); - if (newTopologyTime != Timestamp()) { - if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) { - maxTopologyTime = newTopologyTime; + if (!topology_time_ticker_utils::inRecoveryMode(opCtx)) { + boost::optional<Timestamp> maxTopologyTime; + for (auto it = begin; it != end; it++) { + Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp(); + if (newTopologyTime != Timestamp()) { + if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) { + maxTopologyTime = newTopologyTime; + } } } - } - if (maxTopologyTime) { - opCtx->recoveryUnit()->onCommit( - [this, maxTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable { - _registerTopologyTimeTickPoint(*maxTopologyTime); - }); + if (maxTopologyTime) { + opCtx->recoveryUnit()->onCommit( + [opCtx, maxTopologyTime](boost::optional<Timestamp> commitTime) mutable { + invariant(commitTime); + TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable( + *commitTime, *maxTopologyTime); + }); + } } } @@ -139,6 +141,10 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx, return; } + if (topology_time_ticker_utils::inRecoveryMode(opCtx)) { + return; + } + if (applyOpCmd.firstElementFieldNameStringData() != "applyOps") { return; } @@ -189,61 +195,15 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx, auto newTopologyTime = update_oplog_entry::extractNewValueForField(updateObj, ShardType::topologyTime()) .timestamp(); - if (newTopologyTime == Timestamp()) { - return; - } opCtx->recoveryUnit()->onCommit( - [this, newTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable { - _registerTopologyTimeTickPoint(newTopologyTime); + [opCtx, newTopologyTime](boost::optional<Timestamp> commitTime) mutable { + invariant(commitTime); + TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable( + *commitTime, newTopologyTime); }); } -void ConfigServerOpObserver::_registerTopologyTimeTickPoint(Timestamp newTopologyTime) { - std::vector<Timestamp>::size_type numTickPoints = 0; - { - stdx::lock_guard lg(_mutex); - _topologyTimeTickPoints.push_back(newTopologyTime); - numTickPoints = _topologyTimeTickPoints.size(); - } - if (numTickPoints >= kPossiblyExcessiveNumTopologyTimeTickPoints) { - LOGV2_WARNING(4740600, - "possibly excessive number of topologyTime tick points", - "numTickPoints"_attr = numTickPoints, - "kPossiblyExcessiveNumTopologyTimeTickPoints"_attr = - kPossiblyExcessiveNumTopologyTimeTickPoints); - } -} - -void ConfigServerOpObserver::_tickTopologyTimeIfNecessary(ServiceContext* service, - Timestamp newCommitPointTime) { - stdx::lock_guard lg(_mutex); - - if (_topologyTimeTickPoints.empty()) { - return; - } - - // Find and remove the topologyTime tick points that have been passed. - boost::optional<Timestamp> maxPassedTime; - auto it = _topologyTimeTickPoints.begin(); - while (it != _topologyTimeTickPoints.end()) { - const auto& topologyTimeTickPoint = *it; - if (newCommitPointTime >= topologyTimeTickPoint) { - if (!maxPassedTime || topologyTimeTickPoint > *maxPassedTime) { - maxPassedTime = topologyTimeTickPoint; - } - it = _topologyTimeTickPoints.erase(it); - } else { - it++; - } - } - - // If any tick points were passed, tick TopologyTime to the largest one. - if (maxPassedTime) { - VectorClockMutable::get(service)->tickTopologyTimeTo(LogicalTime(*maxPassedTime)); - } -} - void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) { Timestamp newCommitPointTime = newCommitPoint.getTimestamp(); @@ -252,7 +212,8 @@ void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service // ConfigTime is done first. VectorClockMutable::get(service)->tickConfigTimeTo(LogicalTime(newCommitPointTime)); - _tickTopologyTimeIfNecessary(service, newCommitPointTime); + // Letting the TopologyTimeTicker know that the majority commit point was advanced + TopologyTimeTicker::get(service).onMajorityCommitPointUpdate(service, newCommitPoint); } } // namespace mongo |