diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-12-29 12:04:55 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-12-30 15:35:28 -0500 |
commit | 5d2d6e209acd862324612c7f9c41d65940f8dcba (patch) | |
tree | 8ccfea2ba5cc6b118d5d82d70ebe06ba88b135aa /src | |
parent | 5f4c54029d47229533b54c7683df71809cc26ff0 (diff) | |
download | mongo-5d2d6e209acd862324612c7f9c41d65940f8dcba.tar.gz |
SERVER-22027 Sharding should not retry killed operations
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.err | 6 | ||||
-rw-r--r-- | src/mongo/db/operation_context.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 40 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/service_context.h | 2 | ||||
-rw-r--r-- | src/mongo/db/service_context_d.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/service_context_d.h | 15 | ||||
-rw-r--r-- | src/mongo/db/service_context_noop.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/service_context_noop.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 21 |
14 files changed, 73 insertions, 88 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index faed0dbb14a..43fdefa1c50 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -160,6 +160,7 @@ error_code("CannotGrowDocumentInCappedNamespace", 10003) error_code("DuplicateKey", 11000) error_code("InterruptedAtShutdown", 11600) error_code("Interrupted", 11601) +error_code("InterruptedDueToReplStateChange", 11602) error_code("OutOfDiskSpace", 14031 ) error_code("KeyTooLong", 17280); error_code("BackgroundOperationInProgressForDatabase", 12586); @@ -173,7 +174,10 @@ error_code("PrepareConfigsFailed", 13104); # SERVER-21428 explains the extra changes needed if error_class components change. error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout"]) -error_class("Interruption", ["Interrupted", "InterruptedAtShutdown", "ExceededTimeLimit"]) +error_class("Interruption", ["Interrupted", + "InterruptedAtShutdown", + "InterruptedDueToReplStateChange", + "ExceededTimeLimit"]) error_class("NotMasterError", ["NotMaster", "NotMasterNoSlaveOk"]) error_class("StaleShardingError", ["RecvStaleConfig", "SendStaleConfig", "StaleShardVersion", "StaleEpoch"]) diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 1b79dadcb50..1434ee5ddc7 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -46,12 +46,13 @@ Client* OperationContext::getClient() const { return _client; } -void OperationContext::markKilled() { - _killPending.store(1); +void OperationContext::markKilled(ErrorCodes::Error killCode) { + invariant(killCode != ErrorCodes::OK); + _killCode.compareAndSwap(ErrorCodes::OK, killCode); } -bool OperationContext::isKillPending() const { - return _killPending.loadRelaxed(); +ErrorCodes::Error OperationContext::getKillStatus() const { + return _killCode.loadRelaxed(); } } // namespace mongo diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index a146a8f7f14..3a7e3165dda 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -77,7 +77,6 @@ public: */ virtual RecoveryUnit* recoveryUnit() const = 0; - /** * Returns the RecoveryUnit (same return value as recoveryUnit()) but the caller takes * ownership of the returned RecoveryUnit, and the OperationContext instance relinquishes @@ -183,23 +182,33 @@ public: virtual bool writesAreReplicated() const = 0; /** - * Marks this operation as killed. + * Marks this operation as killed so that subsequent calls to checkForInterrupt and + * checkForInterruptNoAssert by the thread executing the operation will start returning the + * specified error code. * - * Subsequent calls to checkForInterrupt and checkForInterruptNoAssert by the thread - * executing the operation will indicate that the operation has been killed. + * If multiple threads kill the same operation with different codes, only the first code will + * be preserved. * - * May be called by any thread that has locked the Client owning this operation context, - * or by the thread executing on behalf of this operation context. + * May be called by any thread that has locked the Client owning this operation context. */ - void markKilled(); + void markKilled(ErrorCodes::Error killCode = ErrorCodes::Interrupted); /** - * Returns true if markKilled has been called on this operation context. + * Returns the code passed to markKilled if this operation context has been killed previously + * or ErrorCodes::OK otherwise. * - * May be called by any thread that has locked the Client owning this operation context, - * or by the thread executing on behalf of this operation context. + * May be called by any thread that has locked the Client owning this operation context, or + * without lock by the thread executing on behalf of this operation context. + */ + ErrorCodes::Error getKillStatus() const; + + /** + * Shortcut method, which checks whether getKillStatus returns a non-OK value. Has the same + * concurrency rules as getKillStatus. */ - bool isKillPending() const; + bool isKillPending() const { + return getKillStatus() != ErrorCodes::OK; + } protected: OperationContext(Client* client, unsigned int opId, Locker* locker); @@ -211,11 +220,14 @@ private: Client* const _client; const unsigned int _opId; - // The lifetime of locker is managed by subclasses of OperationContext, so it is not - // safe to access _locker in the destructor of OperationContext. + // Not owned. Locker* const _locker; - AtomicInt32 _killPending{0}; + // Follows the values of ErrorCodes::Error. The default value is 0 (OK), which means the + // operation is not killed. If killed, it will contain a specific code. This value changes only + // once from OK to some kill code. + AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK}; + WriteConcernOptions _writeConcern; }; diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index f2c0166876f..b7733958bc2 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -140,6 +140,7 @@ uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const { MONGO_FP_DECLARE(checkForInterruptFail); namespace { + // Helper function for checkForInterrupt fail point. Decides whether the operation currently // being run by the given Client meet the (probabilistic) conditions for interruption as // specified in the fail point info. @@ -194,8 +195,9 @@ Status OperationContextImpl::checkForInterruptNoAssert() { } } - if (isKillPending()) { - return Status(ErrorCodes::Interrupted, "operation was interrupted"); + const auto killStatus = getKillStatus(); + if (killStatus != ErrorCodes::OK) { + return Status(killStatus, "operation was interrupted"); } return Status::OK(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c42b80a605d..52b0fee436c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -350,8 +350,8 @@ void ReplicationCoordinatorExternalStateImpl::closeConnections() { } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { - ServiceContext* environment = getGlobalServiceContext(); - environment->killAllUserOperations(txn); + ServiceContext* environment = txn->getServiceContext(); + environment->killAllUserOperations(txn, ErrorCodes::InterruptedDueToReplStateChange); } void ReplicationCoordinatorExternalStateImpl::clearShardingState() { diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 6f9aa36a22f..4e63046726f 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -284,7 +284,7 @@ public: * Kills all operations that have a Client that is associated with an incoming user * connection, except for the one associated with txn. */ - virtual void killAllUserOperations(const OperationContext* txn) = 0; + virtual void killAllUserOperations(const OperationContext* txn, ErrorCodes::Error killCode) = 0; /** * Registers a listener to be notified each time an op is killed. diff --git a/src/mongo/db/service_context_d.cpp b/src/mongo/db/service_context_d.cpp index 3e5fd2d5be7..470dcb927a5 100644 --- a/src/mongo/db/service_context_d.cpp +++ b/src/mongo/db/service_context_d.cpp @@ -220,21 +220,9 @@ bool ServiceContextMongoD::getKillAllOperations() { return _globalKill; } -bool ServiceContextMongoD::_killOperationsAssociatedWithClientAndOpId_inlock(Client* client, - unsigned int opId) { - OperationContext* opCtx = client->getOperationContext(); - if (!opCtx) { - return false; - } - if (opCtx->getOpID() != opId) { - return false; - } - _killOperation_inlock(opCtx); - return true; -} - -void ServiceContextMongoD::_killOperation_inlock(OperationContext* opCtx) { - opCtx->markKilled(); +void ServiceContextMongoD::_killOperation_inlock(OperationContext* opCtx, + ErrorCodes::Error killCode) { + opCtx->markKilled(killCode); for (const auto listener : _killOpListeners) { try { @@ -248,8 +236,10 @@ void ServiceContextMongoD::_killOperation_inlock(OperationContext* opCtx) { bool ServiceContextMongoD::killOperation(unsigned int opId) { for (LockedClientsCursor cursor(this); Client* client = cursor.next();) { stdx::lock_guard<Client> lk(*client); - bool found = _killOperationsAssociatedWithClientAndOpId_inlock(client, opId); - if (found) { + + OperationContext* opCtx = client->getOperationContext(); + if (opCtx && opCtx->getOpID() == opId) { + _killOperation_inlock(opCtx, ErrorCodes::Interrupted); return true; } } @@ -257,7 +247,8 @@ bool ServiceContextMongoD::killOperation(unsigned int opId) { return false; } -void ServiceContextMongoD::killAllUserOperations(const OperationContext* txn) { +void ServiceContextMongoD::killAllUserOperations(const OperationContext* txn, + ErrorCodes::Error killCode) { for (LockedClientsCursor cursor(this); Client* client = cursor.next();) { if (!client->isFromUserConnection()) { // Don't kill system operations. @@ -266,16 +257,11 @@ void ServiceContextMongoD::killAllUserOperations(const OperationContext* txn) { stdx::lock_guard<Client> lk(*client); OperationContext* toKill = client->getOperationContext(); - if (!toKill) { - continue; - } - if (toKill->getOpID() == txn->getOpID()) { - // Don't kill ourself. - continue; + // Don't kill ourself. + if (toKill && toKill->getOpID() != txn->getOpID()) { + _killOperation_inlock(toKill, killCode); } - - _killOperation_inlock(toKill); } } diff --git a/src/mongo/db/service_context_d.h b/src/mongo/db/service_context_d.h index 0c560ff17f4..dc834197e9e 100644 --- a/src/mongo/db/service_context_d.h +++ b/src/mongo/db/service_context_d.h @@ -67,7 +67,7 @@ public: bool killOperation(unsigned int opId) override; - void killAllUserOperations(const OperationContext* txn) override; + void killAllUserOperations(const OperationContext* txn, ErrorCodes::Error killCode) override; void registerKillOpListener(KillOpListenerInterface* listener) override; @@ -79,22 +79,11 @@ private: std::unique_ptr<OperationContext> _newOpCtx(Client* client) override; /** - * Kills the active operation on "client" if that operation is associated with operation id - * "opId". - * - * Returns true if an operation was killed. - * - * Must only be called by a thread owning both this service context's mutex and the - * client's. - */ - bool _killOperationsAssociatedWithClientAndOpId_inlock(Client* client, unsigned int opId); - - /** * Kills the given operation. * * Caller must own the service context's _mutex. */ - void _killOperation_inlock(OperationContext* opCtx); + void _killOperation_inlock(OperationContext* opCtx, ErrorCodes::Error killCode); bool _globalKill; diff --git a/src/mongo/db/service_context_noop.cpp b/src/mongo/db/service_context_noop.cpp index 65184906442..fd94b35db89 100644 --- a/src/mongo/db/service_context_noop.cpp +++ b/src/mongo/db/service_context_noop.cpp @@ -79,7 +79,8 @@ bool ServiceContextNoop::killOperation(unsigned int opId) { return false; } -void ServiceContextNoop::killAllUserOperations(const OperationContext* txn) {} +void ServiceContextNoop::killAllUserOperations(const OperationContext* txn, + ErrorCodes::Error killCode) {} void ServiceContextNoop::registerKillOpListener(KillOpListenerInterface* listener) {} diff --git a/src/mongo/db/service_context_noop.h b/src/mongo/db/service_context_noop.h index 0e74b61b7a2..62e6a169896 100644 --- a/src/mongo/db/service_context_noop.h +++ b/src/mongo/db/service_context_noop.h @@ -49,7 +49,7 @@ public: bool killOperation(unsigned int opId) override; - void killAllUserOperations(const OperationContext* txn) override; + void killAllUserOperations(const OperationContext* txn, ErrorCodes::Error killCode) override; void setKillAllOperations() override; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 5943cc6e255..5686476e1a3 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -389,8 +389,6 @@ StatusWith<OpTimePair<DatabaseType>> CatalogManagerReplicaSet::_fetchDatabaseMet StatusWith<OpTimePair<CollectionType>> CatalogManagerReplicaSet::getCollection( OperationContext* txn, const std::string& collNs) { - auto configShard = grid.shardRegistry()->getShard(txn, "config"); - auto statusFind = _exhaustiveFindOnConfig(txn, kConfigReadSelector, NamespaceString(CollectionType::ConfigNS), diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp index b21ff63961f..9bf44eaa09c 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp @@ -93,7 +93,7 @@ TEST_F(InsertRetryTest, RetryOnInterruptedAndNetworkErrorSuccess) { onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(request.target, kTestHosts[0]); configTargeter()->setFindHostReturnValue({kTestHosts[1]}); - return Status(ErrorCodes::Interrupted, "Interruption"); + return Status(ErrorCodes::InterruptedDueToReplStateChange, "Interruption"); }); onCommand([&](const RemoteCommandRequest& request) { @@ -271,7 +271,7 @@ TEST_F(UpdateRetryTest, OperationInterruptedDueToPrimaryStepDown) { auto writeErrDetail = stdx::make_unique<WriteErrorDetail>(); writeErrDetail->setIndex(0); - writeErrDetail->setErrCode(ErrorCodes::Interrupted); + writeErrDetail->setErrCode(ErrorCodes::InterruptedDueToReplStateChange); writeErrDetail->setErrMessage("Operation interrupted"); response.addToErrDetails(writeErrDetail.release()); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 55121f42209..e5b91dda83a 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -126,6 +126,7 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kNotMasterErrors{ErrorCodes::N const ShardRegistry::ErrorCodesSet ShardRegistry::kAllRetriableErrors{ ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, + ErrorCodes::NotMasterOrSecondary, // If write concern failed to be satisfied on the remote server, this most probably means that // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe // to be retried if idempotency can be guaranteed. @@ -133,10 +134,7 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kAllRetriableErrors{ ErrorCodes::HostUnreachable, ErrorCodes::HostNotFound, ErrorCodes::NetworkTimeout, - // This set includes interrupted because replica set step down kills all server operations - // before it closes connections so it may happen that the caller actually receives the - // interruption. - ErrorCodes::Interrupted}; + ErrorCodes::InterruptedDueToReplStateChange}; ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, std::unique_ptr<executor::TaskExecutorPool> executorPool, @@ -782,7 +780,8 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat void ShardRegistry::updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter, const HostAndPort& remoteHost, const Status& remoteCommandStatus) { - if (ErrorCodes::isNotMasterError(remoteCommandStatus.code())) { + if (ErrorCodes::isNotMasterError(remoteCommandStatus.code()) || + (remoteCommandStatus == ErrorCodes::InterruptedDueToReplStateChange)) { targeter->markHostNotMaster(remoteHost); } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) { targeter->markHostUnreachable(remoteHost); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 8b23528a04a..c82de6a3bbe 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -51,15 +51,6 @@ namespace { // Maximum number of retries for network and replication notMaster errors (per host). const int kMaxNumFailedHostRetryAttempts = 3; -/** - * Returns whether a particular error code returned from the initial cursor establishment should - * be retried. - */ -bool isPerShardRetriableError(ErrorCodes::Error err) { - return (ShardRegistry::kAllRetriableErrors.count(err) || - err == ErrorCodes::NotMasterOrSecondary); -} - } // namespace AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor, @@ -438,8 +429,7 @@ void AsyncResultsMerger::handleBatchResponse( if (!cursorResponseStatus.isOK()) { // Notify the shard registry of the failure. if (remote.shardId) { - // TODO: Pass down an OperationContext* to use here. - auto shard = grid.shardRegistry()->getShard(nullptr, *remote.shardId); + auto shard = grid.shardRegistry()->getShardNoReload(*remote.shardId); if (!shard) { remote.status = Status(cursorResponseStatus.getStatus().code(), str::stream() << "Could not find shard " << *remote.shardId @@ -453,7 +443,10 @@ void AsyncResultsMerger::handleBatchResponse( // If the error is retriable, schedule another request. if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts && - isPerShardRetriableError(cursorResponseStatus.getStatus().code())) { + ShardRegistry::kAllRetriableErrors.count(cursorResponseStatus.getStatus().code())) { + LOG(1) << "Initial cursor establishment failed with retriable error and will be retried" + << causedBy(cursorResponseStatus.getStatus()); + ++remote.retryCount; // Since we potentially updated the targeter that the last host it chose might be @@ -641,13 +634,13 @@ Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort( invariant(shardId); invariant(!cursorId); - // TODO: Pass down an OperationContext* to use here. - const auto shard = grid.shardRegistry()->getShard(nullptr, *shardId); + const auto shard = grid.shardRegistry()->getShardNoReload(*shardId); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << *shardId); } + // TODO: Pass down an OperationContext* to use here. auto findHostStatus = shard->getTargeter()->findHost( readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(nullptr)); if (!findHostStatus.isOK()) { |