diff options
author | Eric Milkie <milkie@10gen.com> | 2017-01-25 10:10:52 -0500 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2017-01-25 15:07:26 -0500 |
commit | 8253fab192fad307a07846878e368e970990d7b3 (patch) | |
tree | 6862b80ed947243e199ea280b994d855d6c0f56c | |
parent | 3cef6afea83b252613be458a0e0bf94ecea28f96 (diff) | |
download | mongo-8253fab192fad307a07846878e368e970990d7b3.tar.gz |
SERVER-27807 synchronize creating a snapshot with its registration in replcoord
This commit prevents a race between creating a snapshot in the storage engine and
registering it in the replication coordinator. The replication coordinator maintains
a vector of outstanding snapshots, and it needs to stay in sync with the actual snapshots
in the storage engine. The replication coordinator mutex is used to ensure this synchronization.
12 files changed, 73 insertions, 40 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 3e75d04e94f..9c8f03d7c2f 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1257,8 +1257,7 @@ void SnapshotThread::run() { invariant(!opTimeOfSnapshot.isNull()); } - _manager->createSnapshot(txn.get(), name); - replCoord->onSnapshotCreate(opTimeOfSnapshot, name); + replCoord->createSnapshot(txn.get(), opTimeOfSnapshot, name); } catch (const WriteConflictException& wce) { log() << "skipping storage snapshot pass due to write conflict"; continue; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 79deadb3bf9..b60b63ac880 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -752,9 +752,12 @@ public: virtual void forceSnapshotCreation() = 0; /** - * Called when a new snapshot is created. + * Creates a new snapshot in the storage engine and registers it for use in the replication + * coordinator. */ - virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) = 0; + virtual void createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) = 0; /** * Blocks until either the current committed snapshot is at least as high as 'untilSnapshot', diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 4a3445b137e..67fb9f1c509 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -272,6 +272,11 @@ public: virtual void updateCommittedSnapshot(SnapshotName newCommitPoint) = 0; /** + * Creates a new snapshot. + */ + virtual void createSnapshot(OperationContext* txn, SnapshotName name) = 0; + + /** * Signals the SnapshotThread, if running, to take a forced snapshot even if the global * timestamp hasn't changed. * diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c1267979332..9a0dea2381a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -841,6 +841,13 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotNa manager->setCommittedSnapshot(newCommitPoint); } +void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* txn, + SnapshotName name) { + auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); + invariant(manager); // This should never be called if there is no SnapshotManager. + manager->createSnapshot(txn, name); +} + void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() { if (_snapshotThread) _snapshotThread->forceSnapshot(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 6777bddb765..0cb5ecca6e5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -97,6 +97,7 @@ public: virtual void signalApplierToCancelFetcher(); void dropAllSnapshots() final; void updateCommittedSnapshot(SnapshotName newCommitPoint) final; + void createSnapshot(OperationContext* txn, SnapshotName name) final; void forceSnapshotCreation() final; virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 88258d9e499..614136431fd 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -241,6 +241,9 @@ void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {} void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(SnapshotName newCommitPoint) { } +void ReplicationCoordinatorExternalStateMock::createSnapshot(OperationContext* txn, + SnapshotName name) {} + void ReplicationCoordinatorExternalStateMock::forceSnapshotCreation() {} bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 1a4ad071fad..0c75d8df7fc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -90,6 +90,7 @@ public: virtual void signalApplierToCancelFetcher(); virtual void dropAllSnapshots(); virtual void updateCommittedSnapshot(SnapshotName newCommitPoint); + virtual void createSnapshot(OperationContext* txn, SnapshotName name); virtual void forceSnapshotCreation(); virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c275ad246e8..1cc1c949227 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3481,9 +3481,11 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { return _uncommittedSnapshotsSize.load(); } -void ReplicationCoordinatorImpl::onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) { +void ReplicationCoordinatorImpl::createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - + _externalState->createSnapshot(txn, name); auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name}; if (timeOfSnapshot <= _lastCommittedOpTime) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index e31f218bbd0..7bd6fd872b2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -295,7 +295,9 @@ public: virtual void forceSnapshotCreation() override; - virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override; + virtual void createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) override; virtual OpTime getCurrentCommittedSnapshotOpTime() const override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index ceaae23fbea..66b1a56fc1a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -1002,7 +1002,7 @@ TEST_F( getReplCoord()->setLastDurableOptime_forTest(2, 1, time1); getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1); getReplCoord()->setLastDurableOptime_forTest(2, 2, time1); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, majorityWriteConcern); ASSERT_OK(statusAndDur.status); @@ -1042,7 +1042,7 @@ TEST_F( ASSERT_OK(statusAndDur.status); // All modes satisfied - getReplCoord()->onSnapshotCreate(time1, getReplCoord()->reserveSnapshotName(nullptr)); + getReplCoord()->createSnapshot(txn.get(), time1, getReplCoord()->reserveSnapshotName(nullptr)); statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(txn.get(), majorityWriteConcern); @@ -2677,9 +2677,11 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { time_t majorityWriteDate = 100; OpTime majorityOpTime = OpTime(Timestamp(majorityWriteDate, 1), 1); + auto txn = makeOperationContext(); + getReplCoord()->setMyLastAppliedOpTime(opTime); getReplCoord()->setMyLastDurableOpTime(opTime); - getReplCoord()->onSnapshotCreate(majorityOpTime, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), majorityOpTime, SnapshotName(1)); ASSERT_EQUALS(majorityOpTime, getReplCoord()->getCurrentCommittedSnapshotOpTime()); IsMasterResponse response; @@ -3323,10 +3325,11 @@ TEST_F(ReplCoordTest, simulateSuccessfulV1Election(); OpTime time(Timestamp(100, 2), 1); + auto txn = makeOperationContext(); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time)); @@ -3336,8 +3339,6 @@ TEST_F(ReplCoordTest, writeConcern.wMode = WriteConcernOptions::kMajority; writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE; - auto txn = makeOperationContext(); - ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); awaiter.setOpTime(time); @@ -3436,7 +3437,7 @@ TEST_F(ReplCoordTest, ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); } @@ -3708,11 +3709,10 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) { HostAndPort("node1", 12345)); runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto txn = makeOperationContext(); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 1)); - getReplCoord()->onSnapshotCreate(OpTime(Timestamp(100, 0), 1), SnapshotName(1)); - - auto txn = makeOperationContext(); + getReplCoord()->createSnapshot(txn.get(), OpTime(Timestamp(100, 0), 1), SnapshotName(1)); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( txn.get(), @@ -3731,12 +3731,12 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) { << 0))), HostAndPort("node1", 12345)); runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto txn = makeOperationContext(); + OpTime time(Timestamp(100, 0), 1); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); - - auto txn = makeOperationContext(); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( txn.get(), ReadConcernArgs(time, ReadConcernLevel::kMajorityReadConcern))); @@ -3761,7 +3761,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { // Not guaranteed to be scheduled after waitUntil blocks... getReplCoord()->setMyLastAppliedOpTime(committedOpTime); getReplCoord()->setMyLastDurableOpTime(committedOpTime); - getReplCoord()->onSnapshotCreate(committedOpTime, SnapshotName(1)); + getReplCoord()->createSnapshot(nullptr, committedOpTime, SnapshotName(1)); }); auto txn = makeOperationContext(); @@ -3792,7 +3792,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { // Not guaranteed to be scheduled after waitUntil blocks... getReplCoord()->setMyLastAppliedOpTime(opTimeToWait); getReplCoord()->setMyLastDurableOpTime(opTimeToWait); - getReplCoord()->onSnapshotCreate(opTimeToWait, SnapshotName(1)); + getReplCoord()->createSnapshot(nullptr, opTimeToWait, SnapshotName(1)); }); auto txn = makeOperationContext(); @@ -3887,7 +3887,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet ASSERT_EQUALS(1, getReplCoord()->getTerm()); OpTime time(Timestamp(10, 0), 1); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); // higher OpTime, should change StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( @@ -4593,10 +4593,11 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); // ensure current snapshot follows price is right rules (closest but not greater than) getReplCoord()->setMyLastAppliedOpTime(time3); @@ -4626,10 +4627,11 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); // ensure current snapshot will not advance beyond existing snapshots getReplCoord()->setMyLastAppliedOpTime(time6); @@ -4657,17 +4659,18 @@ TEST_F(ReplCoordTest, OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); getReplCoord()->setMyLastAppliedOpTime(time6); getReplCoord()->setMyLastDurableOpTime(time6); ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // ensure current snapshot updates on new snapshot if we are that far - getReplCoord()->onSnapshotCreate(time6, SnapshotName(4)); + getReplCoord()->createSnapshot(txn.get(), time6, SnapshotName(4)); ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } @@ -4690,10 +4693,11 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); // ensure dropping all snapshots should reset the current committed snapshot getReplCoord()->dropAllSnapshots(); @@ -4715,8 +4719,9 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) { OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); getReplCoord()->setMyLastAppliedOpTime(time1); ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); @@ -4875,11 +4880,12 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { OpTime optime(Timestamp(100, 2), 0); getReplCoord()->setMyLastAppliedOpTime(optime); getReplCoord()->setMyLastDurableOpTime(optime); + auto txn = makeOperationContext(); // Set last committed optime via metadata. rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); getReplCoord()->processReplSetMetadata(syncSourceMetadata, true); - getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), optime, SnapshotName(1)); BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle)); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 9663600063c..9fddd74ddab 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -430,7 +430,9 @@ SnapshotName ReplicationCoordinatorMock::reserveSnapshotName(OperationContext* t void ReplicationCoordinatorMock::forceSnapshotCreation() {} -void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) {} +void ReplicationCoordinatorMock::createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name){}; void ReplicationCoordinatorMock::dropAllSnapshots() {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index d2b00cc206e..6aacf7929d9 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -242,7 +242,9 @@ public: virtual void forceSnapshotCreation() override; - virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name); + virtual void createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) override; virtual void dropAllSnapshots() override; |