summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/config_server_op_observer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/config_server_op_observer.cpp')
-rw-r--r--src/mongo/db/s/config_server_op_observer.cpp107
1 files changed, 34 insertions, 73 deletions
diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp
index fe73090a366..477f304d3a8 100644
--- a/src/mongo/db/s/config_server_op_observer.cpp
+++ b/src/mongo/db/s/config_server_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/config_server_op_observer.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/topology_time_ticker.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
@@ -43,16 +44,6 @@
namespace mongo {
-namespace {
-
-// The number of pending topologyTime tick points (stored in the _topologyTimeTickPoints vector) is
-// generally expected to be small (since shard topology operations should be infrequent, relative to
-// any config server replication lag). If the size of this vector exceeds this constant (when a
-// tick point is added), then a warning (with id 4740600) will be logged.
-constexpr size_t kPossiblyExcessiveNumTopologyTimeTickPoints = 3;
-
-} // namespace
-
ConfigServerOpObserver::ConfigServerOpObserver() = default;
ConfigServerOpObserver::~ConfigServerOpObserver() = default;
@@ -102,6 +93,13 @@ void ConfigServerOpObserver::_onReplicationRollback(OperationContext* opCtx,
ShardingCatalogManager::get(opCtx)->discardCachedConfigDatabaseInitializationState();
ClusterIdentityLoader::get(opCtx)->discardCachedClusterId();
}
+
+ if (rbInfo.rollbackNamespaces.find(ShardType::ConfigNS) != rbInfo.rollbackNamespaces.end()) {
+ // If some entries were rollbacked from config.shards we might need to discard some tick
+ // points from the TopologyTimeTicker
+ const auto lastApplied = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime();
+ TopologyTimeTicker::get(opCtx).onReplicationRollback(lastApplied);
+ }
}
void ConfigServerOpObserver::onInserts(OperationContext* opCtx,
@@ -114,21 +112,25 @@ void ConfigServerOpObserver::onInserts(OperationContext* opCtx,
return;
}
- boost::optional<Timestamp> maxTopologyTime;
- for (auto it = begin; it != end; it++) {
- Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp();
- if (newTopologyTime != Timestamp()) {
- if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) {
- maxTopologyTime = newTopologyTime;
+ if (!topology_time_ticker_utils::inRecoveryMode(opCtx)) {
+ boost::optional<Timestamp> maxTopologyTime;
+ for (auto it = begin; it != end; it++) {
+ Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp();
+ if (newTopologyTime != Timestamp()) {
+ if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) {
+ maxTopologyTime = newTopologyTime;
+ }
}
}
- }
- if (maxTopologyTime) {
- opCtx->recoveryUnit()->onCommit(
- [this, maxTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable {
- _registerTopologyTimeTickPoint(*maxTopologyTime);
- });
+ if (maxTopologyTime) {
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, maxTopologyTime](boost::optional<Timestamp> commitTime) mutable {
+ invariant(commitTime);
+ TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable(
+ *commitTime, *maxTopologyTime);
+ });
+ }
}
}
@@ -139,6 +141,10 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx,
return;
}
+ if (topology_time_ticker_utils::inRecoveryMode(opCtx)) {
+ return;
+ }
+
if (applyOpCmd.firstElementFieldNameStringData() != "applyOps") {
return;
}
@@ -189,61 +195,15 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx,
auto newTopologyTime =
update_oplog_entry::extractNewValueForField(updateObj, ShardType::topologyTime())
.timestamp();
- if (newTopologyTime == Timestamp()) {
- return;
- }
opCtx->recoveryUnit()->onCommit(
- [this, newTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable {
- _registerTopologyTimeTickPoint(newTopologyTime);
+ [opCtx, newTopologyTime](boost::optional<Timestamp> commitTime) mutable {
+ invariant(commitTime);
+ TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable(
+ *commitTime, newTopologyTime);
});
}
-void ConfigServerOpObserver::_registerTopologyTimeTickPoint(Timestamp newTopologyTime) {
- std::vector<Timestamp>::size_type numTickPoints = 0;
- {
- stdx::lock_guard lg(_mutex);
- _topologyTimeTickPoints.push_back(newTopologyTime);
- numTickPoints = _topologyTimeTickPoints.size();
- }
- if (numTickPoints >= kPossiblyExcessiveNumTopologyTimeTickPoints) {
- LOGV2_WARNING(4740600,
- "possibly excessive number of topologyTime tick points",
- "numTickPoints"_attr = numTickPoints,
- "kPossiblyExcessiveNumTopologyTimeTickPoints"_attr =
- kPossiblyExcessiveNumTopologyTimeTickPoints);
- }
-}
-
-void ConfigServerOpObserver::_tickTopologyTimeIfNecessary(ServiceContext* service,
- Timestamp newCommitPointTime) {
- stdx::lock_guard lg(_mutex);
-
- if (_topologyTimeTickPoints.empty()) {
- return;
- }
-
- // Find and remove the topologyTime tick points that have been passed.
- boost::optional<Timestamp> maxPassedTime;
- auto it = _topologyTimeTickPoints.begin();
- while (it != _topologyTimeTickPoints.end()) {
- const auto& topologyTimeTickPoint = *it;
- if (newCommitPointTime >= topologyTimeTickPoint) {
- if (!maxPassedTime || topologyTimeTickPoint > *maxPassedTime) {
- maxPassedTime = topologyTimeTickPoint;
- }
- it = _topologyTimeTickPoints.erase(it);
- } else {
- it++;
- }
- }
-
- // If any tick points were passed, tick TopologyTime to the largest one.
- if (maxPassedTime) {
- VectorClockMutable::get(service)->tickTopologyTimeTo(LogicalTime(*maxPassedTime));
- }
-}
-
void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) {
Timestamp newCommitPointTime = newCommitPoint.getTimestamp();
@@ -252,7 +212,8 @@ void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service
// ConfigTime is done first.
VectorClockMutable::get(service)->tickConfigTimeTo(LogicalTime(newCommitPointTime));
- _tickTopologyTimeIfNecessary(service, newCommitPointTime);
+ // Letting the TopologyTimeTicker know that the majority commit point was advanced
+ TopologyTimeTicker::get(service).onMajorityCommitPointUpdate(service, newCommitPoint);
}
} // namespace mongo