diff options
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/client/shard_local.cpp | 42 | ||||
-rw-r--r-- | src/mongo/s/client/shard_local.h | 19 |
2 files changed, 53 insertions, 8 deletions
diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp index dcf6db33205..b6789ecb170 100644 --- a/src/mongo/s/client/shard_local.cpp +++ b/src/mongo/s/client/shard_local.cpp @@ -37,12 +37,14 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/read_concern_response.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/unique_message.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -94,10 +96,29 @@ bool ShardLocal::isRetriableError(ErrorCodes::Error code, RetryPolicy options) { } } +void ShardLocal::_updateLastOpTimeFromClient(OperationContext* txn) { + repl::OpTime lastOpTimeFromClient = + repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + if (lastOpTimeFromClient.isNull()) { + return; + } + + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(lastOpTimeFromClient >= _lastOpTime); + _lastOpTime = lastOpTimeFromClient; +} + +repl::OpTime ShardLocal::_getLastOpTime() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _lastOpTime; +} + StatusWith<Shard::CommandResponse> ShardLocal::_runCommand(OperationContext* txn, const ReadPreferenceSetting& unused, const std::string& dbName, const BSONObj& cmdObj) { + ON_BLOCK_EXIT([this, &txn] { _updateLastOpTimeFromClient(txn); }); + try { DBDirectClient client(txn); rpc::UniqueReply commandResponse = client.runCommandWithMetadata( @@ -134,16 +155,21 @@ StatusWith<Shard::QueryResponse> ShardLocal::_exhaustiveFindOnConfig( // Set up operation context with majority read snapshot so correct optime can be retrieved. Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); - // Wait until a snapshot is available. - while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { - LOG(1) << "Waiting for ReadFromMajorityCommittedSnapshot to become available"; - replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min()); - status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + // Wait for any writes performed by this ShardLocal instance to be committed and visible. + auto readConcernResponse = replCoord->waitUntilOpTime( + txn, repl::ReadConcernArgs{_getLastOpTime(), readConcernLevel}); + if (!readConcernResponse.getStatus().isOK()) { + if (readConcernResponse.getStatus() == ErrorCodes::ShutdownInProgress || + ErrorCodes::isInterruption(readConcernResponse.getStatus().code())) { + return readConcernResponse.getStatus(); + } + fassertStatusOK(40188, readConcernResponse.getStatus()); } - if (!status.isOK()) { - return status; - } + // Inform the storage engine to read from the committed snapshot for the rest of this + // operation. + status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); + fassertStatusOK(40189, status); } else { invariant(readConcernLevel == repl::ReadConcernLevel::kLocalReadConcern); } diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h index b39aba621d0..1c65015b78e 100644 --- a/src/mongo/s/client/shard_local.h +++ b/src/mongo/s/client/shard_local.h @@ -30,7 +30,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/client/dbclientinterface.h" +#include "mongo/db/repl/optime.h" #include "mongo/s/client/shard.h" +#include "mongo/stdx/mutex.h" namespace mongo { @@ -75,6 +77,23 @@ private: const BSONObj& query, const BSONObj& sort, boost::optional<long long> limit) final; + + /** + * Checks if an OpTime was set on the current Client (ie if the current operation performed a + * write) and if so updates _lastOpTime to the OpTime from the write that was just performed. + */ + void _updateLastOpTimeFromClient(OperationContext* txn); + + repl::OpTime _getLastOpTime(); + + // Guards _lastOpTime below. + stdx::mutex _mutex; + + // Stores the optime that was generated by the last operation to perform a write that was run + // through _runCommand. Used in _exhaustiveFindOnConfig for waiting for that optime to be + // committed so that readConcern majority reads will read the writes that were performed without + // a w:majority write concern. + repl::OpTime _lastOpTime{}; }; } // namespace mongo |