If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/operation_context.h" namespace mongo { namespace { const OperationContext::Decoration shardingMetadataDecoration = OperationContext::declareDecoration(); // Max time to wait for the migration critical section to complete const Milliseconds kMaxWaitForMigrationCriticalSection = Minutes(5); // Max time to wait for the movePrimary critical section to complete const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5); // The name of the field in which the client attaches its database version. constexpr auto kDbVersionField = "databaseVersion"_sd; } // namespace OperationShardingState::OperationShardingState() = default; OperationShardingState::~OperationShardingState() { invariant(!_shardingOperationFailedStatus); } OperationShardingState& OperationShardingState::get(OperationContext* opCtx) { return shardingMetadataDecoration(opCtx); } bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) { return !(get(opCtx)._shardVersions.empty()); } void OperationShardingState::setAllowImplicitCollectionCreation( const BSONElement& allowImplicitCollectionCreationElem) { if (!allowImplicitCollectionCreationElem.eoo()) { _allowImplicitCollectionCreation = allowImplicitCollectionCreationElem.Bool(); } else { _allowImplicitCollectionCreation = true; } } bool OperationShardingState::allowImplicitCollectionCreation() const { return _allowImplicitCollectionCreation; } void OperationShardingState::initializeClientRoutingVersionsFromCommand(NamespaceString nss, const BSONObj& cmdObj) { boost::optional shardVersion; boost::optional dbVersion; const auto shardVersionElem = cmdObj.getField(ChunkVersion::kShardVersionField); if (!shardVersionElem.eoo()) { shardVersion = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj)); } const auto dbVersionElem = cmdObj.getField(kDbVersionField); if (!dbVersionElem.eoo()) { uassert(ErrorCodes::BadValue, str::stream() << "expected databaseVersion element to be an object, got " << dbVersionElem, dbVersionElem.type() == BSONType::Object); dbVersion = DatabaseVersion::parse(IDLParserErrorContext("initializeClientRoutingVersions"), dbVersionElem.Obj()); } initializeClientRoutingVersions(nss, shardVersion, dbVersion); } void OperationShardingState::initializeClientRoutingVersions( NamespaceString nss, const boost::optional& shardVersion, const boost::optional& dbVersion) { if (shardVersion) { invariant(_shardVersionsChecked.find(nss.ns()) == _shardVersionsChecked.end(), nss.ns()); _shardVersions[nss.ns()] = *shardVersion; } if (dbVersion) { invariant(_databaseVersions.find(nss.db()) == _databaseVersions.end()); _databaseVersions[nss.db()] = *dbVersion; } } bool OperationShardingState::hasShardVersion(const NamespaceString& nss) const { return _shardVersions.find(nss.ns()) != _shardVersions.end(); } boost::optional OperationShardingState::getShardVersion(const NamespaceString& nss) { _shardVersionsChecked.insert(nss.ns()); const auto it = _shardVersions.find(nss.ns()); if (it != _shardVersions.end()) { return it->second; } return boost::none; } bool OperationShardingState::hasDbVersion() const { return !_databaseVersions.empty(); } boost::optional OperationShardingState::getDbVersion( const StringData dbName) const { const auto it = _databaseVersions.find(dbName); if (it == _databaseVersions.end()) { return boost::none; } return it->second; } bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* opCtx) { // Must not block while holding a lock invariant(!opCtx->lockState()->isLocked()); if (_migrationCriticalSectionSignal) { auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMigrationCriticalSection); opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { _migrationCriticalSectionSignal->wait(opCtx); }); _migrationCriticalSectionSignal = boost::none; return true; } return false; } void OperationShardingState::setMigrationCriticalSectionSignal( boost::optional> critSecSignal) { invariant(critSecSignal); _migrationCriticalSectionSignal = std::move(critSecSignal); } bool OperationShardingState::waitForMovePrimaryCriticalSectionSignal(OperationContext* opCtx) { // Must not block while holding a lock invariant(!opCtx->lockState()->isLocked()); if (_movePrimaryCriticalSectionSignal) { auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + std::min(opCtx->getRemainingMaxTimeMillis(), kMaxWaitForMovePrimaryCriticalSection); opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { _movePrimaryCriticalSectionSignal->wait(opCtx); }); _movePrimaryCriticalSectionSignal = boost::none; return true; } return false; } void OperationShardingState::setMovePrimaryCriticalSectionSignal( boost::optional> critSecSignal) { invariant(critSecSignal); _movePrimaryCriticalSectionSignal = std::move(critSecSignal); } void OperationShardingState::setShardingOperationFailedStatus(const Status& status) { invariant(!_shardingOperationFailedStatus); _shardingOperationFailedStatus = std::move(status); } boost::optional OperationShardingState::resetShardingOperationFailedStatus() { if (!_shardingOperationFailedStatus) { return boost::none; } Status failedStatus = Status(*_shardingOperationFailedStatus); _shardingOperationFailedStatus = boost::none; return failedStatus; } } // namespace mongo