diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-05-03 11:28:57 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-05-03 18:57:26 -0400 |
commit | 595d7d795f443cf4e848d8044d4102363b4870d2 (patch) | |
tree | 85477547e7d6270b87ab68d5381bac1fcb94aa43 | |
parent | 9fab79e546604a74d577bbc1672e78d4bf849e82 (diff) | |
download | mongo-595d7d795f443cf4e848d8044d4102363b4870d2.tar.gz |
SERVER-23213 Remove all users of ShardRegistry::kAllRetriableErrors
This reverts commit f294d1dcd4884857d693b0f8a33cf1a6be434409.
-rw-r--r-- | jstests/aggregation/extras/utils.js | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp | 45 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 75 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 6 |
7 files changed, 101 insertions, 49 deletions
diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js index d8203f56b94..33cfe9b2b2b 100644 --- a/jstests/aggregation/extras/utils.js +++ b/jstests/aggregation/extras/utils.js @@ -275,9 +275,8 @@ function assertErrorCode(coll, pipe, code, errmsg) { var error = assert.throws(function() { cursor.itcount(); }, [], "expected error: " + code); - if (!error.message.search(code)) { - assert(false, "expected error: " + code + " got: " + error); - } + + assert.eq(error.code, code); } else { assert.eq(cursorRes.code, code); } diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 9d05a072dca..21a973a201a 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -13,6 +13,7 @@ env.Library( '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_interface', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', '$BUILD_DIR/mongo/s/client/sharding_client', + '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/util/fail_point' ], ) diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 18c06b9139c..d29a40fc749 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -1651,12 +1651,20 @@ Status CatalogManagerReplicaSet::insertConfigDocument(OperationContext* txn, request.setNS(nss); request.setWriteConcern(kMajorityWriteConcern.toBSON()); + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); for (int retry = 1; retry <= kMaxWriteRetry; retry++) { BatchedCommandResponse response; - _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kNotIdempotent); + _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kNoRetry); Status status = response.toStatus(); + if (retry < kMaxWriteRetry && + configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) { + // Pretend like the operation is idempotent because we're handling DuplicateKey errors + // specially + continue; + } + // If we get DuplicateKey error on the first attempt to insert, this definitively means that // we are trying to insert the same entry a second time, so error out. If it happens on a // retry attempt though, it is not clear whether we are actually inserting a duplicate key @@ -1694,10 +1702,6 @@ Status CatalogManagerReplicaSet::insertConfigDocument(OperationContext* txn, } } - if (ShardRegistry::kAllRetriableErrors.count(status.code()) && (retry < kMaxWriteRetry)) { - continue; - } - return status; } diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp index 4f4457c793b..31fb999cdbf 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp @@ -40,6 +40,7 @@ #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_name.h" @@ -285,6 +286,7 @@ StatusWith<DistLockManager::ScopedDistLock> ReplSetDistLockManager::lockWithSess // independent write operations. int networkErrorRetries = 0; + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); // Distributed lock acquisition works by tring to update the state of the lock to 'taken'. If // the lock is currently taken, we will back off and try the acquisition again, repeating this // until the lockTryInterval has been reached. If a network error occurs at each lock @@ -318,7 +320,7 @@ StatusWith<DistLockManager::ScopedDistLock> ReplSetDistLockManager::lockWithSess } // If a network error occurred, unlock the lock synchronously and try again - if (ShardRegistry::kAllRetriableErrors.count(status.code()) && + if (configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && networkErrorRetries < kMaxNumLockAcquireRetries) { LOG(1) << "Failed to acquire distributed lock because of retriable error. Retrying " "acquisition by first unlocking the stale entry, which possibly exists now" diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp index 999386de0ff..b1ff0c3490f 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp @@ -41,12 +41,21 @@ #include "mongo/base/status_with.h" #include "mongo/bson/json.h" #include "mongo/bson/util/builder.h" +#include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/service_context_noop.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/balancer/balancer_configuration.h" +#include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_catalog_mock.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" +#include "mongo/s/client/shard_factory.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -117,8 +126,28 @@ public: } protected: + virtual std::unique_ptr<TickSource> makeTickSource() { + return stdx::make_unique<SystemTickSource>(); + } + void setUp() override { - getGlobalServiceContext()->setTickSource(stdx::make_unique<SystemTickSource>()); + getGlobalServiceContext()->setTickSource(makeTickSource()); + + // Set up grid just so that the config shard is accessible via the ShardRegistry. + auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryMock>(); + auto shardFactory = stdx::make_unique<ShardFactory>(std::move(targeterFactory)); + ConnectionString configCS = ConnectionString::forReplicaSet( + "configReplSet", std::vector<HostAndPort>{HostAndPort{"config"}}); + auto shardRegistry = stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS); + grid.init(nullptr, + nullptr, + std::move(shardRegistry), + nullptr, + stdx::make_unique<BalancerConfiguration>( + ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes), + nullptr, + nullptr); + _mgr->startUp(); } @@ -126,6 +155,7 @@ protected: // Don't care about what shutDown passes to stopPing here. _mockCatalog->expectStopPing([](StringData) {}, Status::OK()); _mgr->shutDown(txn()); + grid.clearForUnitTests(); } std::unique_ptr<DistLockCatalogMock> _dummyDoNotUse; // dummy placeholder @@ -138,17 +168,18 @@ protected: class RSDistLockMgrWithMockTickSource : public ReplSetDistLockManagerFixture { public: /** + * Override the way the fixture gets the tick source to install to use a mock tick source. + */ + std::unique_ptr<TickSource> makeTickSource() override { + return stdx::make_unique<TickSourceMock>(); + } + + /** * Returns the mock tick source. */ TickSourceMock* getMockTickSource() { return dynamic_cast<TickSourceMock*>(getGlobalServiceContext()->getTickSource()); } - -protected: - void setUp() override { - getGlobalServiceContext()->setTickSource(stdx::make_unique<TickSourceMock>()); - _mgr->startUp(); - } }; std::string mapToString(const std::map<OID, int>& map) { diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 0167ac5683d..40ab9d8e427 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -427,42 +427,42 @@ void AsyncResultsMerger::handleBatchResponse( : cbData.response.getStatus()); if (!cursorResponseStatus.isOK()) { - // Notify the shard registry of the failure. - if (remote.shardId) { - auto shard = grid.shardRegistry()->getShardNoReload(*remote.shardId); - if (!shard) { - remote.status = Status(cursorResponseStatus.getStatus().code(), - str::stream() << "Could not find shard " << *remote.shardId - << " containing host " - << remote.getTargetHost().toString()); + auto shard = remote.getShard(); + if (!shard) { + remote.status = + Status(cursorResponseStatus.getStatus().code(), + str::stream() << "Could not find shard " << *remote.shardId + << " containing host " << remote.getTargetHost().toString()); + } else { + shard->updateReplSetMonitor(remote.getTargetHost(), cursorResponseStatus.getStatus()); + + // Retry initial cursor establishment if possible. Never retry getMores to avoid + // accidentally skipping results. + if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts && + shard->isRetriableError(cursorResponseStatus.getStatus().code(), + Shard::RetryPolicy::kIdempotent)) { + invariant(remote.shardId); + LOG(1) << "Initial cursor establishment failed with retriable error and will be " + "retried" << causedBy(cursorResponseStatus.getStatus()); + + ++remote.retryCount; + + // Since we potentially updated the targeter that the last host it chose might be + // faulty, the call below may end up getting a different host. + remote.status = askForNextBatch_inlock(remoteIndex); + if (remote.status.isOK()) { + return; + } + + // If we end up here, it means we failed to schedule the retry request, which is a + // more + // severe error that should not be retried. Just pass through to the error handling + // logic below. } else { - shard->updateReplSetMonitor(remote.getTargetHost(), - cursorResponseStatus.getStatus()); + remote.status = cursorResponseStatus.getStatus(); } } - // If the error is retriable, schedule another request. - if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts && - ShardRegistry::kAllRetriableErrors.count(cursorResponseStatus.getStatus().code())) { - LOG(1) << "Initial cursor establishment failed with retriable error and will be retried" - << causedBy(cursorResponseStatus.getStatus()); - - ++remote.retryCount; - - // Since we potentially updated the targeter that the last host it chose might be - // faulty, the call below may end up getting a different host. - remote.status = askForNextBatch_inlock(remoteIndex); - if (remote.status.isOK()) { - return; - } - - // If we end up here, it means we failed to schedule the retry request, which is a more - // severe error that should not be retried. Just pass through to the error handling - // logic below. - } else { - remote.status = cursorResponseStatus.getStatus(); - } - // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. if (_params.isAllowPartialResults) { @@ -634,7 +634,7 @@ Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort( invariant(shardId); invariant(!cursorId); - const auto shard = grid.shardRegistry()->getShardNoReload(*shardId); + const auto shard = getShard(); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << *shardId); @@ -652,6 +652,15 @@ Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort( return Status::OK(); } +std::shared_ptr<Shard> AsyncResultsMerger::RemoteCursorData::getShard() { + invariant(shardId || _shardHostAndPort); + if (shardId) { + return grid.shardRegistry()->getShardNoReload(*shardId); + } else { + return grid.shardRegistry()->getShardNoReload(_shardHostAndPort->toString()); + } +} + // // AsyncResultsMerger::MergingComparator // diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index d30281aa8f6..b4d04a9c7ad 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -207,7 +207,13 @@ private: */ Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); + /** + * Returns the Shard object associated with this remote cursor. + */ + std::shared_ptr<Shard> getShard(); + // ShardId on which a cursor will be created. + // TODO: This should always be set. const boost::optional<ShardId> shardId; // The command object for sending to the remote to establish the cursor. If a remote cursor |