summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-12-29 12:04:55 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-12-30 15:35:28 -0500
commit5d2d6e209acd862324612c7f9c41d65940f8dcba (patch)
tree8ccfea2ba5cc6b118d5d82d70ebe06ba88b135aa /src
parent5f4c54029d47229533b54c7683df71809cc26ff0 (diff)
downloadmongo-5d2d6e209acd862324612c7f9c41d65940f8dcba.tar.gz
SERVER-22027 Sharding should not retry killed operations
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err6
-rw-r--r--src/mongo/db/operation_context.cpp9
-rw-r--r--src/mongo/db/operation_context.h40
-rw-r--r--src/mongo/db/operation_context_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/service_context.h2
-rw-r--r--src/mongo/db/service_context_d.cpp38
-rw-r--r--src/mongo/db/service_context_d.h15
-rw-r--r--src/mongo/db/service_context_noop.cpp3
-rw-r--r--src/mongo/db/service_context_noop.h2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp4
-rw-r--r--src/mongo/s/client/shard_registry.cpp9
-rw-r--r--src/mongo/s/query/async_results_merger.cpp21
14 files changed, 73 insertions, 88 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index faed0dbb14a..43fdefa1c50 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -160,6 +160,7 @@ error_code("CannotGrowDocumentInCappedNamespace", 10003)
error_code("DuplicateKey", 11000)
error_code("InterruptedAtShutdown", 11600)
error_code("Interrupted", 11601)
+error_code("InterruptedDueToReplStateChange", 11602)
error_code("OutOfDiskSpace", 14031 )
error_code("KeyTooLong", 17280);
error_code("BackgroundOperationInProgressForDatabase", 12586);
@@ -173,7 +174,10 @@ error_code("PrepareConfigsFailed", 13104);
# SERVER-21428 explains the extra changes needed if error_class components change.
error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout"])
-error_class("Interruption", ["Interrupted", "InterruptedAtShutdown", "ExceededTimeLimit"])
+error_class("Interruption", ["Interrupted",
+ "InterruptedAtShutdown",
+ "InterruptedDueToReplStateChange",
+ "ExceededTimeLimit"])
error_class("NotMasterError", ["NotMaster", "NotMasterNoSlaveOk"])
error_class("StaleShardingError",
["RecvStaleConfig", "SendStaleConfig", "StaleShardVersion", "StaleEpoch"])
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 1b79dadcb50..1434ee5ddc7 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -46,12 +46,13 @@ Client* OperationContext::getClient() const {
return _client;
}
-void OperationContext::markKilled() {
- _killPending.store(1);
+void OperationContext::markKilled(ErrorCodes::Error killCode) {
+ invariant(killCode != ErrorCodes::OK);
+ _killCode.compareAndSwap(ErrorCodes::OK, killCode);
}
-bool OperationContext::isKillPending() const {
- return _killPending.loadRelaxed();
+ErrorCodes::Error OperationContext::getKillStatus() const {
+ return _killCode.loadRelaxed();
}
} // namespace mongo
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index a146a8f7f14..3a7e3165dda 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -77,7 +77,6 @@ public:
*/
virtual RecoveryUnit* recoveryUnit() const = 0;
-
/**
* Returns the RecoveryUnit (same return value as recoveryUnit()) but the caller takes
* ownership of the returned RecoveryUnit, and the OperationContext instance relinquishes
@@ -183,23 +182,33 @@ public:
virtual bool writesAreReplicated() const = 0;
/**
- * Marks this operation as killed.
+ * Marks this operation as killed so that subsequent calls to checkForInterrupt and
+ * checkForInterruptNoAssert by the thread executing the operation will start returning the
+ * specified error code.
*
- * Subsequent calls to checkForInterrupt and checkForInterruptNoAssert by the thread
- * executing the operation will indicate that the operation has been killed.
+ * If multiple threads kill the same operation with different codes, only the first code will
+ * be preserved.
*
- * May be called by any thread that has locked the Client owning this operation context,
- * or by the thread executing on behalf of this operation context.
+ * May be called by any thread that has locked the Client owning this operation context.
*/
- void markKilled();
+ void markKilled(ErrorCodes::Error killCode = ErrorCodes::Interrupted);
/**
- * Returns true if markKilled has been called on this operation context.
+ * Returns the code passed to markKilled if this operation context has been killed previously
+ * or ErrorCodes::OK otherwise.
*
- * May be called by any thread that has locked the Client owning this operation context,
- * or by the thread executing on behalf of this operation context.
+ * May be called by any thread that has locked the Client owning this operation context, or
+ * without lock by the thread executing on behalf of this operation context.
+ */
+ ErrorCodes::Error getKillStatus() const;
+
+ /**
+ * Shortcut method, which checks whether getKillStatus returns a non-OK value. Has the same
+ * concurrency rules as getKillStatus.
*/
- bool isKillPending() const;
+ bool isKillPending() const {
+ return getKillStatus() != ErrorCodes::OK;
+ }
protected:
OperationContext(Client* client, unsigned int opId, Locker* locker);
@@ -211,11 +220,14 @@ private:
Client* const _client;
const unsigned int _opId;
- // The lifetime of locker is managed by subclasses of OperationContext, so it is not
- // safe to access _locker in the destructor of OperationContext.
+ // Not owned.
Locker* const _locker;
- AtomicInt32 _killPending{0};
+ // Follows the values of ErrorCodes::Error. The default value is 0 (OK), which means the
+ // operation is not killed. If killed, it will contain a specific code. This value changes only
+ // once from OK to some kill code.
+ AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK};
+
WriteConcernOptions _writeConcern;
};
diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp
index f2c0166876f..b7733958bc2 100644
--- a/src/mongo/db/operation_context_impl.cpp
+++ b/src/mongo/db/operation_context_impl.cpp
@@ -140,6 +140,7 @@ uint64_t OperationContextImpl::getRemainingMaxTimeMicros() const {
MONGO_FP_DECLARE(checkForInterruptFail);
namespace {
+
// Helper function for checkForInterrupt fail point. Decides whether the operation currently
// being run by the given Client meet the (probabilistic) conditions for interruption as
// specified in the fail point info.
@@ -194,8 +195,9 @@ Status OperationContextImpl::checkForInterruptNoAssert() {
}
}
- if (isKillPending()) {
- return Status(ErrorCodes::Interrupted, "operation was interrupted");
+ const auto killStatus = getKillStatus();
+ if (killStatus != ErrorCodes::OK) {
+ return Status(killStatus, "operation was interrupted");
}
return Status::OK();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index c42b80a605d..52b0fee436c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -350,8 +350,8 @@ void ReplicationCoordinatorExternalStateImpl::closeConnections() {
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
- ServiceContext* environment = getGlobalServiceContext();
- environment->killAllUserOperations(txn);
+ ServiceContext* environment = txn->getServiceContext();
+ environment->killAllUserOperations(txn, ErrorCodes::InterruptedDueToReplStateChange);
}
void ReplicationCoordinatorExternalStateImpl::clearShardingState() {
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 6f9aa36a22f..4e63046726f 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -284,7 +284,7 @@ public:
* Kills all operations that have a Client that is associated with an incoming user
* connection, except for the one associated with txn.
*/
- virtual void killAllUserOperations(const OperationContext* txn) = 0;
+ virtual void killAllUserOperations(const OperationContext* txn, ErrorCodes::Error killCode) = 0;
/**
* Registers a listener to be notified each time an op is killed.
diff --git a/src/mongo/db/service_context_d.cpp b/src/mongo/db/service_context_d.cpp
index 3e5fd2d5be7..470dcb927a5 100644
--- a/src/mongo/db/service_context_d.cpp
+++ b/src/mongo/db/service_context_d.cpp
@@ -220,21 +220,9 @@ bool ServiceContextMongoD::getKillAllOperations() {
return _globalKill;
}
-bool ServiceContextMongoD::_killOperationsAssociatedWithClientAndOpId_inlock(Client* client,
- unsigned int opId) {
- OperationContext* opCtx = client->getOperationContext();
- if (!opCtx) {
- return false;
- }
- if (opCtx->getOpID() != opId) {
- return false;
- }
- _killOperation_inlock(opCtx);
- return true;
-}
-
-void ServiceContextMongoD::_killOperation_inlock(OperationContext* opCtx) {
- opCtx->markKilled();
+void ServiceContextMongoD::_killOperation_inlock(OperationContext* opCtx,
+ ErrorCodes::Error killCode) {
+ opCtx->markKilled(killCode);
for (const auto listener : _killOpListeners) {
try {
@@ -248,8 +236,10 @@ void ServiceContextMongoD::_killOperation_inlock(OperationContext* opCtx) {
bool ServiceContextMongoD::killOperation(unsigned int opId) {
for (LockedClientsCursor cursor(this); Client* client = cursor.next();) {
stdx::lock_guard<Client> lk(*client);
- bool found = _killOperationsAssociatedWithClientAndOpId_inlock(client, opId);
- if (found) {
+
+ OperationContext* opCtx = client->getOperationContext();
+ if (opCtx && opCtx->getOpID() == opId) {
+ _killOperation_inlock(opCtx, ErrorCodes::Interrupted);
return true;
}
}
@@ -257,7 +247,8 @@ bool ServiceContextMongoD::killOperation(unsigned int opId) {
return false;
}
-void ServiceContextMongoD::killAllUserOperations(const OperationContext* txn) {
+void ServiceContextMongoD::killAllUserOperations(const OperationContext* txn,
+ ErrorCodes::Error killCode) {
for (LockedClientsCursor cursor(this); Client* client = cursor.next();) {
if (!client->isFromUserConnection()) {
// Don't kill system operations.
@@ -266,16 +257,11 @@ void ServiceContextMongoD::killAllUserOperations(const OperationContext* txn) {
stdx::lock_guard<Client> lk(*client);
OperationContext* toKill = client->getOperationContext();
- if (!toKill) {
- continue;
- }
- if (toKill->getOpID() == txn->getOpID()) {
- // Don't kill ourself.
- continue;
+ // Don't kill ourself.
+ if (toKill && toKill->getOpID() != txn->getOpID()) {
+ _killOperation_inlock(toKill, killCode);
}
-
- _killOperation_inlock(toKill);
}
}
diff --git a/src/mongo/db/service_context_d.h b/src/mongo/db/service_context_d.h
index 0c560ff17f4..dc834197e9e 100644
--- a/src/mongo/db/service_context_d.h
+++ b/src/mongo/db/service_context_d.h
@@ -67,7 +67,7 @@ public:
bool killOperation(unsigned int opId) override;
- void killAllUserOperations(const OperationContext* txn) override;
+ void killAllUserOperations(const OperationContext* txn, ErrorCodes::Error killCode) override;
void registerKillOpListener(KillOpListenerInterface* listener) override;
@@ -79,22 +79,11 @@ private:
std::unique_ptr<OperationContext> _newOpCtx(Client* client) override;
/**
- * Kills the active operation on "client" if that operation is associated with operation id
- * "opId".
- *
- * Returns true if an operation was killed.
- *
- * Must only be called by a thread owning both this service context's mutex and the
- * client's.
- */
- bool _killOperationsAssociatedWithClientAndOpId_inlock(Client* client, unsigned int opId);
-
- /**
* Kills the given operation.
*
* Caller must own the service context's _mutex.
*/
- void _killOperation_inlock(OperationContext* opCtx);
+ void _killOperation_inlock(OperationContext* opCtx, ErrorCodes::Error killCode);
bool _globalKill;
diff --git a/src/mongo/db/service_context_noop.cpp b/src/mongo/db/service_context_noop.cpp
index 65184906442..fd94b35db89 100644
--- a/src/mongo/db/service_context_noop.cpp
+++ b/src/mongo/db/service_context_noop.cpp
@@ -79,7 +79,8 @@ bool ServiceContextNoop::killOperation(unsigned int opId) {
return false;
}
-void ServiceContextNoop::killAllUserOperations(const OperationContext* txn) {}
+void ServiceContextNoop::killAllUserOperations(const OperationContext* txn,
+ ErrorCodes::Error killCode) {}
void ServiceContextNoop::registerKillOpListener(KillOpListenerInterface* listener) {}
diff --git a/src/mongo/db/service_context_noop.h b/src/mongo/db/service_context_noop.h
index 0e74b61b7a2..62e6a169896 100644
--- a/src/mongo/db/service_context_noop.h
+++ b/src/mongo/db/service_context_noop.h
@@ -49,7 +49,7 @@ public:
bool killOperation(unsigned int opId) override;
- void killAllUserOperations(const OperationContext* txn) override;
+ void killAllUserOperations(const OperationContext* txn, ErrorCodes::Error killCode) override;
void setKillAllOperations() override;
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 5943cc6e255..5686476e1a3 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -389,8 +389,6 @@ StatusWith<OpTimePair<DatabaseType>> CatalogManagerReplicaSet::_fetchDatabaseMet
StatusWith<OpTimePair<CollectionType>> CatalogManagerReplicaSet::getCollection(
OperationContext* txn, const std::string& collNs) {
- auto configShard = grid.shardRegistry()->getShard(txn, "config");
-
auto statusFind = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
NamespaceString(CollectionType::ConfigNS),
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp
index b21ff63961f..9bf44eaa09c 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_write_retry_test.cpp
@@ -93,7 +93,7 @@ TEST_F(InsertRetryTest, RetryOnInterruptedAndNetworkErrorSuccess) {
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(request.target, kTestHosts[0]);
configTargeter()->setFindHostReturnValue({kTestHosts[1]});
- return Status(ErrorCodes::Interrupted, "Interruption");
+ return Status(ErrorCodes::InterruptedDueToReplStateChange, "Interruption");
});
onCommand([&](const RemoteCommandRequest& request) {
@@ -271,7 +271,7 @@ TEST_F(UpdateRetryTest, OperationInterruptedDueToPrimaryStepDown) {
auto writeErrDetail = stdx::make_unique<WriteErrorDetail>();
writeErrDetail->setIndex(0);
- writeErrDetail->setErrCode(ErrorCodes::Interrupted);
+ writeErrDetail->setErrCode(ErrorCodes::InterruptedDueToReplStateChange);
writeErrDetail->setErrMessage("Operation interrupted");
response.addToErrDetails(writeErrDetail.release());
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 55121f42209..e5b91dda83a 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -126,6 +126,7 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kNotMasterErrors{ErrorCodes::N
const ShardRegistry::ErrorCodesSet ShardRegistry::kAllRetriableErrors{
ErrorCodes::NotMaster,
ErrorCodes::NotMasterNoSlaveOk,
+ ErrorCodes::NotMasterOrSecondary,
// If write concern failed to be satisfied on the remote server, this most probably means that
// some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe
// to be retried if idempotency can be guaranteed.
@@ -133,10 +134,7 @@ const ShardRegistry::ErrorCodesSet ShardRegistry::kAllRetriableErrors{
ErrorCodes::HostUnreachable,
ErrorCodes::HostNotFound,
ErrorCodes::NetworkTimeout,
- // This set includes interrupted because replica set step down kills all server operations
- // before it closes connections so it may happen that the caller actually receives the
- // interruption.
- ErrorCodes::Interrupted};
+ ErrorCodes::InterruptedDueToReplStateChange};
ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory,
std::unique_ptr<executor::TaskExecutorPool> executorPool,
@@ -782,7 +780,8 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat
void ShardRegistry::updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter,
const HostAndPort& remoteHost,
const Status& remoteCommandStatus) {
- if (ErrorCodes::isNotMasterError(remoteCommandStatus.code())) {
+ if (ErrorCodes::isNotMasterError(remoteCommandStatus.code()) ||
+ (remoteCommandStatus == ErrorCodes::InterruptedDueToReplStateChange)) {
targeter->markHostNotMaster(remoteHost);
} else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) {
targeter->markHostUnreachable(remoteHost);
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 8b23528a04a..c82de6a3bbe 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -51,15 +51,6 @@ namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
const int kMaxNumFailedHostRetryAttempts = 3;
-/**
- * Returns whether a particular error code returned from the initial cursor establishment should
- * be retried.
- */
-bool isPerShardRetriableError(ErrorCodes::Error err) {
- return (ShardRegistry::kAllRetriableErrors.count(err) ||
- err == ErrorCodes::NotMasterOrSecondary);
-}
-
} // namespace
AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor,
@@ -438,8 +429,7 @@ void AsyncResultsMerger::handleBatchResponse(
if (!cursorResponseStatus.isOK()) {
// Notify the shard registry of the failure.
if (remote.shardId) {
- // TODO: Pass down an OperationContext* to use here.
- auto shard = grid.shardRegistry()->getShard(nullptr, *remote.shardId);
+ auto shard = grid.shardRegistry()->getShardNoReload(*remote.shardId);
if (!shard) {
remote.status = Status(cursorResponseStatus.getStatus().code(),
str::stream() << "Could not find shard " << *remote.shardId
@@ -453,7 +443,10 @@ void AsyncResultsMerger::handleBatchResponse(
// If the error is retriable, schedule another request.
if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts &&
- isPerShardRetriableError(cursorResponseStatus.getStatus().code())) {
+ ShardRegistry::kAllRetriableErrors.count(cursorResponseStatus.getStatus().code())) {
+ LOG(1) << "Initial cursor establishment failed with retriable error and will be retried"
+ << causedBy(cursorResponseStatus.getStatus());
+
++remote.retryCount;
// Since we potentially updated the targeter that the last host it chose might be
@@ -641,13 +634,13 @@ Status AsyncResultsMerger::RemoteCursorData::resolveShardIdToHostAndPort(
invariant(shardId);
invariant(!cursorId);
- // TODO: Pass down an OperationContext* to use here.
- const auto shard = grid.shardRegistry()->getShard(nullptr, *shardId);
+ const auto shard = grid.shardRegistry()->getShardNoReload(*shardId);
if (!shard) {
return Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << *shardId);
}
+ // TODO: Pass down an OperationContext* to use here.
auto findHostStatus = shard->getTargeter()->findHost(
readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(nullptr));
if (!findHostStatus.isOK()) {