summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2017-10-23 14:17:56 -0400
committerMisha Tyulenev <misha@mongodb.com>2017-10-23 14:19:03 -0400
commitbb4381df8195f86da18a9204d14d225e858b5118 (patch)
tree9cd36743a1ba3b749be5c959059ce9248e616a2f /src/mongo/db
parent43ce8f6cf255a88c62acef47df0694d764844d3c (diff)
downloadmongo-bb4381df8195f86da18a9204d14d225e858b5118.tar.gz
SERVER-31609 fix request synchronization for makeNoopWriteIfNeeded
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/oplog_note.cpp5
-rw-r--r--src/mongo/db/read_concern.cpp110
2 files changed, 86 insertions, 29 deletions
diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp
index b9b25078852..d3efa60ac50 100644
--- a/src/mongo/db/commands/oplog_note.cpp
+++ b/src/mongo/db/commands/oplog_note.cpp
@@ -148,8 +148,9 @@ public:
result, _performNoopWrite(opCtx, dataElement.Obj(), "appendOpLogNote"));
} else {
std::stringstream ss;
- ss << "Requested maxClusterTime" << maxClusterTime.toString()
- << " is less or equal to the last primary OpTime: " << lastAppliedOpTime.toString();
+ ss << "Requested maxClusterTime" << LogicalTime(maxClusterTime).toString()
+ << " is less or equal to the last primary OpTime: "
+ << LogicalTime(lastAppliedOpTime).toString();
return appendCommandStatus(result, {ErrorCodes::StaleClusterTime, ss.str()});
}
}
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index 35d625ccf84..f79fd3541f4 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -55,14 +55,61 @@ namespace mongo {
namespace {
/**
+ * Synchronize writeRequests
+ */
+
+class WriteRequestSynchronizer;
+const auto getWriteRequestsSynchronizer =
+ ServiceContext::declareDecoration<WriteRequestSynchronizer>();
+
+class WriteRequestSynchronizer {
+public:
+ WriteRequestSynchronizer() = default;
+
+ /**
+ * Returns a tuple <false, existingWriteRequest> if it can find the one that happened after or
+ * at clusterTime.
+ * Returns a tuple <true, newWriteRequest> otherwise.
+ */
+ std::tuple<bool, std::shared_ptr<Notification<Status>>> getOrCreateWriteRequest(
+ LogicalTime clusterTime) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ auto lastEl = _writeRequests.rbegin();
+ if (lastEl != _writeRequests.rend() && lastEl->first >= clusterTime.asTimestamp()) {
+ return std::make_tuple(false, lastEl->second);
+ } else {
+ auto newWriteRequest = std::make_shared<Notification<Status>>();
+ _writeRequests[clusterTime.asTimestamp()] = newWriteRequest;
+ return std::make_tuple(true, newWriteRequest);
+ }
+ }
+
+ /**
+ * Erases writeRequest that happened at clusterTime
+ */
+ void deleteWriteRequest(LogicalTime clusterTime) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ auto el = _writeRequests.find(clusterTime.asTimestamp());
+ invariant(el != _writeRequests.end());
+ invariant(el->second);
+ el->second.reset();
+ _writeRequests.erase(el);
+ }
+
+private:
+ stdx::mutex _mutex;
+ std::map<Timestamp, std::shared_ptr<Notification<Status>>> _writeRequests;
+};
+
+
+/**
* Schedule a write via appendOplogNote command to the primary of this replica set.
*/
Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) {
repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx);
invariant(replCoord->isReplEnabled());
- static stdx::mutex mutex;
- static std::shared_ptr<Notification<Status>> writeRequest;
+ auto& writeRequests = getWriteRequestsSynchronizer(opCtx->getClient()->getServiceContext());
auto lastAppliedOpTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp());
auto status = Status::OK();
@@ -70,12 +117,10 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) {
// this loop addresses the case when two or more threads need to advance the opLog time but the
// one that waits for the notification gets the later clusterTime, so when the request finishes
// it needs to be repeated with the later time.
- while (clusterTime > lastAppliedOpTime && status.isOK()) {
+ while (clusterTime > lastAppliedOpTime) {
auto shardingState = ShardingState::get(opCtx);
// standalone replica set, so there is no need to advance the OpLog on the primary.
if (!shardingState->enabled()) {
- log() << "Attempting to make a write for clusterTIme: " << clusterTime
- << " but is in standalone RS";
return Status::OK();
}
@@ -93,35 +138,45 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) {
return Status(ErrorCodes::InternalError, ss.str());
}
- stdx::unique_lock<stdx::mutex> lock(mutex);
- auto myWriteRequest = writeRequest;
- if (!myWriteRequest) {
- myWriteRequest = (writeRequest = std::make_shared<Notification<Status>>());
- lock.unlock();
- auto swRes = myShard.getValue()->runCommand(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- BSON(
- "appendOplogNote" << 1 << "maxClusterTime" << clusterTime.asTimestamp()
- << "data"
- << BSON("noop write for afterClusterTime read concern" << 1)),
- Shard::RetryPolicy::kIdempotent);
- myWriteRequest->set(swRes.getStatus());
- lock.lock();
- writeRequest = nullptr;
- lock.unlock();
- status = swRes.getStatus();
+ auto myWriteRequest = writeRequests.getOrCreateWriteRequest(clusterTime);
+ if (std::get<0>(myWriteRequest)) { // Its a new request
+ try {
+ LOG(2) << "New appendOplogNote request on clusterTime: " << clusterTime.toString()
+ << " remaining attempts: " << remainingAttempts;
+ auto swRes = myShard.getValue()->runCommand(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ BSON("appendOplogNote" << 1 << "maxClusterTime" << clusterTime.asTimestamp()
+ << "data"
+ << BSON("noop write for afterClusterTime read concern"
+ << 1)),
+ Shard::RetryPolicy::kIdempotent);
+ status = swRes.getStatus();
+ std::get<1>(myWriteRequest)->set(status);
+ writeRequests.deleteWriteRequest(clusterTime);
+ } catch (const DBException& ex) {
+ status = ex.toStatus();
+ // signal the writeRequest to unblock waiters
+ std::get<1>(myWriteRequest)->set(status);
+ writeRequests.deleteWriteRequest(clusterTime);
+ }
} else {
- lock.unlock();
+ LOG(2) << "Join appendOplogNote request on clusterTime: " << clusterTime.toString()
+ << " remaining attempts: " << remainingAttempts;
try {
- status = myWriteRequest->get(opCtx);
+ status = std::get<1>(myWriteRequest)->get(opCtx);
} catch (const DBException& ex) {
return ex.toStatus();
}
}
+ // If the write status is ok need to wait for the oplog to replicate.
+ if (status.isOK()) {
+ return status;
+ }
lastAppliedOpTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp());
}
+ invariant(status.isOK());
return status;
}
} // namespace
@@ -182,7 +237,8 @@ Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs&
if (replCoord->isReplEnabled() && afterClusterTime) {
auto status = makeNoopWriteIfNeeded(opCtx, *afterClusterTime);
if (!status.isOK()) {
- LOG(1) << "failed noop write due to " << status.toString();
+ LOG(0) << "Failed noop write at clusterTime: " << afterClusterTime->toString()
+ << " due to " << status.toString();
}
}