summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/client/shard_local.cpp42
-rw-r--r--src/mongo/s/client/shard_local.h19
2 files changed, 53 insertions, 8 deletions
diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp
index dcf6db33205..b6789ecb170 100644
--- a/src/mongo/s/client/shard_local.cpp
+++ b/src/mongo/s/client/shard_local.cpp
@@ -37,12 +37,14 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/read_concern_response.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/unique_message.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -94,10 +96,29 @@ bool ShardLocal::isRetriableError(ErrorCodes::Error code, RetryPolicy options) {
}
}
+void ShardLocal::_updateLastOpTimeFromClient(OperationContext* txn) {
+ repl::OpTime lastOpTimeFromClient =
+ repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ if (lastOpTimeFromClient.isNull()) {
+ return;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(lastOpTimeFromClient >= _lastOpTime);
+ _lastOpTime = lastOpTimeFromClient;
+}
+
+repl::OpTime ShardLocal::_getLastOpTime() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _lastOpTime;
+}
+
StatusWith<Shard::CommandResponse> ShardLocal::_runCommand(OperationContext* txn,
const ReadPreferenceSetting& unused,
const std::string& dbName,
const BSONObj& cmdObj) {
+ ON_BLOCK_EXIT([this, &txn] { _updateLastOpTimeFromClient(txn); });
+
try {
DBDirectClient client(txn);
rpc::UniqueReply commandResponse = client.runCommandWithMetadata(
@@ -134,16 +155,21 @@ StatusWith<Shard::QueryResponse> ShardLocal::_exhaustiveFindOnConfig(
// Set up operation context with majority read snapshot so correct optime can be retrieved.
Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
- // Wait until a snapshot is available.
- while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
- LOG(1) << "Waiting for ReadFromMajorityCommittedSnapshot to become available";
- replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min());
- status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ // Wait for any writes performed by this ShardLocal instance to be committed and visible.
+ auto readConcernResponse = replCoord->waitUntilOpTime(
+ txn, repl::ReadConcernArgs{_getLastOpTime(), readConcernLevel});
+ if (!readConcernResponse.getStatus().isOK()) {
+ if (readConcernResponse.getStatus() == ErrorCodes::ShutdownInProgress ||
+ ErrorCodes::isInterruption(readConcernResponse.getStatus().code())) {
+ return readConcernResponse.getStatus();
+ }
+ fassertStatusOK(40188, readConcernResponse.getStatus());
}
- if (!status.isOK()) {
- return status;
- }
+ // Inform the storage engine to read from the committed snapshot for the rest of this
+ // operation.
+ status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
+ fassertStatusOK(40189, status);
} else {
invariant(readConcernLevel == repl::ReadConcernLevel::kLocalReadConcern);
}
diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h
index b39aba621d0..1c65015b78e 100644
--- a/src/mongo/s/client/shard_local.h
+++ b/src/mongo/s/client/shard_local.h
@@ -30,7 +30,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/s/client/shard.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -75,6 +77,23 @@ private:
const BSONObj& query,
const BSONObj& sort,
boost::optional<long long> limit) final;
+
+ /**
+ * Checks if an OpTime was set on the current Client (ie if the current operation performed a
+ * write) and if so updates _lastOpTime to the OpTime from the write that was just performed.
+ */
+ void _updateLastOpTimeFromClient(OperationContext* txn);
+
+ repl::OpTime _getLastOpTime();
+
+ // Guards _lastOpTime below.
+ stdx::mutex _mutex;
+
+ // Stores the optime that was generated by the last operation to perform a write that was run
+ // through _runCommand. Used in _exhaustiveFindOnConfig for waiting for that optime to be
+ // committed so that readConcern majority reads will read the writes that were performed without
+ // a w:majority write concern.
+ repl::OpTime _lastOpTime{};
};
} // namespace mongo