From bb4381df8195f86da18a9204d14d225e858b5118 Mon Sep 17 00:00:00 2001 From: Misha Tyulenev Date: Mon, 23 Oct 2017 14:17:56 -0400 Subject: SERVER-31609 fix request synchronization for makeNoopWriteIfNeeded --- src/mongo/db/commands/oplog_note.cpp | 5 +- src/mongo/db/read_concern.cpp | 110 ++++++++++++++++++++++++++--------- 2 files changed, 86 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp index b9b25078852..d3efa60ac50 100644 --- a/src/mongo/db/commands/oplog_note.cpp +++ b/src/mongo/db/commands/oplog_note.cpp @@ -148,8 +148,9 @@ public: result, _performNoopWrite(opCtx, dataElement.Obj(), "appendOpLogNote")); } else { std::stringstream ss; - ss << "Requested maxClusterTime" << maxClusterTime.toString() - << " is less or equal to the last primary OpTime: " << lastAppliedOpTime.toString(); + ss << "Requested maxClusterTime" << LogicalTime(maxClusterTime).toString() + << " is less or equal to the last primary OpTime: " + << LogicalTime(lastAppliedOpTime).toString(); return appendCommandStatus(result, {ErrorCodes::StaleClusterTime, ss.str()}); } } diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp index 35d625ccf84..f79fd3541f4 100644 --- a/src/mongo/db/read_concern.cpp +++ b/src/mongo/db/read_concern.cpp @@ -54,6 +54,54 @@ namespace mongo { namespace { +/** + * Synchronize writeRequests + */ + +class WriteRequestSynchronizer; +const auto getWriteRequestsSynchronizer = + ServiceContext::declareDecoration(); + +class WriteRequestSynchronizer { +public: + WriteRequestSynchronizer() = default; + + /** + * Returns a tuple if it can find the one that happened after or + * at clusterTime. + * Returns a tuple otherwise. + */ + std::tuple>> getOrCreateWriteRequest( + LogicalTime clusterTime) { + stdx::unique_lock lock(_mutex); + auto lastEl = _writeRequests.rbegin(); + if (lastEl != _writeRequests.rend() && lastEl->first >= clusterTime.asTimestamp()) { + return std::make_tuple(false, lastEl->second); + } else { + auto newWriteRequest = std::make_shared>(); + _writeRequests[clusterTime.asTimestamp()] = newWriteRequest; + return std::make_tuple(true, newWriteRequest); + } + } + + /** + * Erases writeRequest that happened at clusterTime + */ + void deleteWriteRequest(LogicalTime clusterTime) { + stdx::unique_lock lock(_mutex); + auto el = _writeRequests.find(clusterTime.asTimestamp()); + invariant(el != _writeRequests.end()); + invariant(el->second); + el->second.reset(); + _writeRequests.erase(el); + } + +private: + stdx::mutex _mutex; + std::map>> _writeRequests; +}; + + /** * Schedule a write via appendOplogNote command to the primary of this replica set. */ @@ -61,8 +109,7 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { repl::ReplicationCoordinator* const replCoord = repl::ReplicationCoordinator::get(opCtx); invariant(replCoord->isReplEnabled()); - static stdx::mutex mutex; - static std::shared_ptr> writeRequest; + auto& writeRequests = getWriteRequestsSynchronizer(opCtx->getClient()->getServiceContext()); auto lastAppliedOpTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); auto status = Status::OK(); @@ -70,12 +117,10 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { // this loop addresses the case when two or more threads need to advance the opLog time but the // one that waits for the notification gets the later clusterTime, so when the request finishes // it needs to be repeated with the later time. - while (clusterTime > lastAppliedOpTime && status.isOK()) { + while (clusterTime > lastAppliedOpTime) { auto shardingState = ShardingState::get(opCtx); // standalone replica set, so there is no need to advance the OpLog on the primary. if (!shardingState->enabled()) { - log() << "Attempting to make a write for clusterTIme: " << clusterTime - << " but is in standalone RS"; return Status::OK(); } @@ -93,35 +138,45 @@ Status makeNoopWriteIfNeeded(OperationContext* opCtx, LogicalTime clusterTime) { return Status(ErrorCodes::InternalError, ss.str()); } - stdx::unique_lock lock(mutex); - auto myWriteRequest = writeRequest; - if (!myWriteRequest) { - myWriteRequest = (writeRequest = std::make_shared>()); - lock.unlock(); - auto swRes = myShard.getValue()->runCommand( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - BSON( - "appendOplogNote" << 1 << "maxClusterTime" << clusterTime.asTimestamp() - << "data" - << BSON("noop write for afterClusterTime read concern" << 1)), - Shard::RetryPolicy::kIdempotent); - myWriteRequest->set(swRes.getStatus()); - lock.lock(); - writeRequest = nullptr; - lock.unlock(); - status = swRes.getStatus(); + auto myWriteRequest = writeRequests.getOrCreateWriteRequest(clusterTime); + if (std::get<0>(myWriteRequest)) { // Its a new request + try { + LOG(2) << "New appendOplogNote request on clusterTime: " << clusterTime.toString() + << " remaining attempts: " << remainingAttempts; + auto swRes = myShard.getValue()->runCommand( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + BSON("appendOplogNote" << 1 << "maxClusterTime" << clusterTime.asTimestamp() + << "data" + << BSON("noop write for afterClusterTime read concern" + << 1)), + Shard::RetryPolicy::kIdempotent); + status = swRes.getStatus(); + std::get<1>(myWriteRequest)->set(status); + writeRequests.deleteWriteRequest(clusterTime); + } catch (const DBException& ex) { + status = ex.toStatus(); + // signal the writeRequest to unblock waiters + std::get<1>(myWriteRequest)->set(status); + writeRequests.deleteWriteRequest(clusterTime); + } } else { - lock.unlock(); + LOG(2) << "Join appendOplogNote request on clusterTime: " << clusterTime.toString() + << " remaining attempts: " << remainingAttempts; try { - status = myWriteRequest->get(opCtx); + status = std::get<1>(myWriteRequest)->get(opCtx); } catch (const DBException& ex) { return ex.toStatus(); } } + // If the write status is ok need to wait for the oplog to replicate. + if (status.isOK()) { + return status; + } lastAppliedOpTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); } + invariant(status.isOK()); return status; } } // namespace @@ -182,7 +237,8 @@ Status waitForReadConcern(OperationContext* opCtx, const repl::ReadConcernArgs& if (replCoord->isReplEnabled() && afterClusterTime) { auto status = makeNoopWriteIfNeeded(opCtx, *afterClusterTime); if (!status.isOK()) { - LOG(1) << "failed noop write due to " << status.toString(); + LOG(0) << "Failed noop write at clusterTime: " << afterClusterTime->toString() + << " due to " << status.toString(); } } -- cgit v1.2.1