summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/operation_sharding_state.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/operation_sharding_state.cpp')
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp64
1 files changed, 44 insertions, 20 deletions
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 906c491aac3..40617c08138 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -39,7 +39,8 @@ namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
OperationContext::declareDecoration<OperationShardingState>();
-const ChunkVersion kUnshardedVersion(ChunkVersion::UNSHARDED());
+// Max time to wait for the migration critical section to complete
+const Minutes kMaxWaitForMigrationCriticalSection(5);
} // namespace mongo
@@ -49,12 +50,12 @@ OperationShardingState& OperationShardingState::get(OperationContext* txn) {
return shardingMetadataDecoration(txn);
}
-void OperationShardingState::initializeShardVersion(NamespaceString ns,
+void OperationShardingState::initializeShardVersion(NamespaceString nss,
const BSONElement& shardVersionElt) {
invariant(!hasShardVersion());
- if (ns.isSystemDotIndexes()) {
- setShardVersion(std::move(ns), ChunkVersion::IGNORED());
+ if (nss.isSystemDotIndexes()) {
+ setShardVersion(std::move(nss), ChunkVersion::IGNORED());
return;
}
@@ -70,31 +71,54 @@ void OperationShardingState::initializeShardVersion(NamespaceString ns,
return;
}
- setShardVersion(std::move(ns), std::move(newVersion));
+ setShardVersion(std::move(nss), std::move(newVersion));
}
bool OperationShardingState::hasShardVersion() const {
return _hasVersion;
}
-const ChunkVersion& OperationShardingState::getShardVersion(const NamespaceString& ns) const {
- if (_ns != ns) {
- return kUnshardedVersion;
+ChunkVersion OperationShardingState::getShardVersion(const NamespaceString& nss) const {
+ if (_ns != nss) {
+ return ChunkVersion::UNSHARDED();
}
return _shardVersion;
}
-void OperationShardingState::setShardVersion(NamespaceString ns, ChunkVersion newVersion) {
+void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion newVersion) {
// This currently supports only setting the shard version for one namespace.
- invariant(!_hasVersion || _ns == ns);
- invariant(!ns.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
+ invariant(!_hasVersion || _ns == nss);
+ invariant(!nss.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
- _ns = std::move(ns);
+ _ns = std::move(nss);
_shardVersion = std::move(newVersion);
_hasVersion = true;
}
+bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) {
+ // Must not block while holding a lock
+ invariant(!txn->lockState()->isLocked());
+
+ if (_migrationCriticalSection) {
+ const Microseconds operationRemainingTime(Microseconds(txn->getRemainingMaxTimeMicros()));
+ _migrationCriticalSection->waitUntilOutOfCriticalSection(
+ durationCount<Microseconds>(operationRemainingTime)
+ ? operationRemainingTime
+ : kMaxWaitForMigrationCriticalSection);
+ _migrationCriticalSection = nullptr;
+ return true;
+ }
+
+ return false;
+}
+
+void OperationShardingState::setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) {
+ invariant(critSec);
+ _migrationCriticalSection = std::move(critSec);
+}
+
void OperationShardingState::_clear() {
_hasVersion = false;
_shardVersion = ChunkVersion();
@@ -104,21 +128,21 @@ void OperationShardingState::_clear() {
OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* txn,
const NamespaceString& ns)
: _txn(txn), _ns(ns) {
- auto& operationVersion = OperationShardingState::get(txn);
- _hadOriginalVersion = operationVersion._hasVersion;
+ auto& oss = OperationShardingState::get(txn);
+ _hadOriginalVersion = oss._hasVersion;
if (_hadOriginalVersion) {
- _originalVersion = operationVersion.getShardVersion(ns);
+ _originalVersion = oss.getShardVersion(ns);
}
- operationVersion.setShardVersion(ns, ChunkVersion::IGNORED());
+ oss.setShardVersion(ns, ChunkVersion::IGNORED());
}
OperationShardingState::IgnoreVersioningBlock::~IgnoreVersioningBlock() {
- auto& operationVersion = OperationShardingState::get(_txn);
- invariant(ChunkVersion::isIgnoredVersion(operationVersion.getShardVersion(_ns)));
+ auto& oss = OperationShardingState::get(_txn);
+ invariant(ChunkVersion::isIgnoredVersion(oss.getShardVersion(_ns)));
if (_hadOriginalVersion) {
- operationVersion.setShardVersion(_ns, _originalVersion);
+ oss.setShardVersion(_ns, _originalVersion);
} else {
- operationVersion._clear();
+ oss._clear();
}
}