diff options
author | Eric Milkie <milkie@10gen.com> | 2017-01-25 10:10:52 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-02-08 11:45:10 -0500 |
commit | c29ab24ab04e44fca58e9987645ad45166b14bee (patch) | |
tree | d3b5c3ef1ba6130b187c9ba4ad702f2b9245a3f3 | |
parent | 831b20f952945f8d75929b72b2834fef716cdbb1 (diff) | |
download | mongo-c29ab24ab04e44fca58e9987645ad45166b14bee.tar.gz |
SERVER-27807 synchronize creating a snapshot with its registration in replcoord
This commit prevents a race between creating a snapshot in the storage engine and
registering it in the replication coordinator. The replication coordinator maintains
a vector of outstanding snapshots, and it needs to stay in sync with the actual snapshots
in the storage engine. The replication coordinator mutex is used to ensure this synchronization.
(cherry picked from commit 8253fab192fad307a07846878e368e970990d7b3)
12 files changed, 73 insertions, 40 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index a9c29c2791d..833da2a2e86 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1243,8 +1243,7 @@ void SnapshotThread::run() { invariant(!opTimeOfSnapshot.isNull()); } - _manager->createSnapshot(txn.get(), name); - replCoord->onSnapshotCreate(opTimeOfSnapshot, name); + replCoord->createSnapshot(txn.get(), opTimeOfSnapshot, name); } catch (const WriteConflictException& wce) { log() << "skipping storage snapshot pass due to write conflict"; continue; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index c33582a0aba..f379ab97b95 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -750,9 +750,12 @@ public: virtual void forceSnapshotCreation() = 0; /** - * Called when a new snapshot is created. + * Creates a new snapshot in the storage engine and registers it for use in the replication + * coordinator. */ - virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) = 0; + virtual void createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) = 0; /** * Blocks until either the current committed snapshot is at least as high as 'untilSnapshot', diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 4a3445b137e..67fb9f1c509 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -272,6 +272,11 @@ public: virtual void updateCommittedSnapshot(SnapshotName newCommitPoint) = 0; /** + * Creates a new snapshot. + */ + virtual void createSnapshot(OperationContext* txn, SnapshotName name) = 0; + + /** * Signals the SnapshotThread, if running, to take a forced snapshot even if the global * timestamp hasn't changed. * diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c439f0d4a80..088bafe2652 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -851,6 +851,13 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotNa manager->setCommittedSnapshot(newCommitPoint); } +void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* txn, + SnapshotName name) { + auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); + invariant(manager); // This should never be called if there is no SnapshotManager. + manager->createSnapshot(txn, name); +} + void ReplicationCoordinatorExternalStateImpl::forceSnapshotCreation() { if (_snapshotThread) _snapshotThread->forceSnapshot(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 6777bddb765..0cb5ecca6e5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -97,6 +97,7 @@ public: virtual void signalApplierToCancelFetcher(); void dropAllSnapshots() final; void updateCommittedSnapshot(SnapshotName newCommitPoint) final; + void createSnapshot(OperationContext* txn, SnapshotName name) final; void forceSnapshotCreation() final; virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 88258d9e499..614136431fd 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -241,6 +241,9 @@ void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {} void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(SnapshotName newCommitPoint) { } +void ReplicationCoordinatorExternalStateMock::createSnapshot(OperationContext* txn, + SnapshotName name) {} + void ReplicationCoordinatorExternalStateMock::forceSnapshotCreation() {} bool ReplicationCoordinatorExternalStateMock::snapshotsEnabled() const { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 1a4ad071fad..0c75d8df7fc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -90,6 +90,7 @@ public: virtual void signalApplierToCancelFetcher(); virtual void dropAllSnapshots(); virtual void updateCommittedSnapshot(SnapshotName newCommitPoint); + virtual void createSnapshot(OperationContext* txn, SnapshotName name); virtual void forceSnapshotCreation(); virtual bool snapshotsEnabled() const; virtual void notifyOplogMetadataWaiters(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 7e1b66afb87..6933f0fd7ad 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3479,9 +3479,11 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { return _uncommittedSnapshotsSize.load(); } -void ReplicationCoordinatorImpl::onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) { +void ReplicationCoordinatorImpl::createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) { stdx::lock_guard<stdx::mutex> lock(_mutex); - + _externalState->createSnapshot(txn, name); auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name}; if (timeOfSnapshot <= _lastCommittedOpTime) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 0478624183f..0b0acef8bb4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -301,7 +301,9 @@ public: virtual void forceSnapshotCreation() override; - virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override; + virtual void createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) override; virtual OpTime getCurrentCommittedSnapshotOpTime() const override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 757a422bf70..7db948156fc 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -1001,7 +1001,7 @@ TEST_F( getReplCoord()->setLastDurableOptime_forTest(2, 1, time1); getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1); getReplCoord()->setLastDurableOptime_forTest(2, 2, time1); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, majorityWriteConcern); ASSERT_OK(statusAndDur.status); @@ -1041,7 +1041,7 @@ TEST_F( ASSERT_OK(statusAndDur.status); // All modes satisfied - getReplCoord()->onSnapshotCreate(time1, getReplCoord()->reserveSnapshotName(nullptr)); + getReplCoord()->createSnapshot(txn.get(), time1, getReplCoord()->reserveSnapshotName(nullptr)); statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(txn.get(), majorityWriteConcern); @@ -2676,9 +2676,11 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { time_t majorityWriteDate = 100; OpTime majorityOpTime = OpTime(Timestamp(majorityWriteDate, 1), 1); + auto txn = makeOperationContext(); + getReplCoord()->setMyLastAppliedOpTime(opTime); getReplCoord()->setMyLastDurableOpTime(opTime); - getReplCoord()->onSnapshotCreate(majorityOpTime, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), majorityOpTime, SnapshotName(1)); ASSERT_EQUALS(majorityOpTime, getReplCoord()->getCurrentCommittedSnapshotOpTime()); IsMasterResponse response; @@ -3322,10 +3324,11 @@ TEST_F(ReplCoordTest, simulateSuccessfulV1Election(); OpTime time(Timestamp(100, 2), 1); + auto txn = makeOperationContext(); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time)); @@ -3335,8 +3338,6 @@ TEST_F(ReplCoordTest, writeConcern.wMode = WriteConcernOptions::kMajority; writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE; - auto txn = makeOperationContext(); - ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); awaiter.setOpTime(time); @@ -3435,7 +3436,7 @@ TEST_F(ReplCoordTest, ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); } @@ -3707,11 +3708,10 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) { HostAndPort("node1", 12345)); runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto txn = makeOperationContext(); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 1)); - getReplCoord()->onSnapshotCreate(OpTime(Timestamp(100, 0), 1), SnapshotName(1)); - - auto txn = makeOperationContext(); + getReplCoord()->createSnapshot(txn.get(), OpTime(Timestamp(100, 0), 1), SnapshotName(1)); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( txn.get(), @@ -3730,12 +3730,12 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) { << 0))), HostAndPort("node1", 12345)); runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto txn = makeOperationContext(); + OpTime time(Timestamp(100, 0), 1); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); - - auto txn = makeOperationContext(); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( txn.get(), ReadConcernArgs(time, ReadConcernLevel::kMajorityReadConcern))); @@ -3760,7 +3760,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { // Not guaranteed to be scheduled after waitUntil blocks... getReplCoord()->setMyLastAppliedOpTime(committedOpTime); getReplCoord()->setMyLastDurableOpTime(committedOpTime); - getReplCoord()->onSnapshotCreate(committedOpTime, SnapshotName(1)); + getReplCoord()->createSnapshot(nullptr, committedOpTime, SnapshotName(1)); }); auto txn = makeOperationContext(); @@ -3791,7 +3791,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { // Not guaranteed to be scheduled after waitUntil blocks... getReplCoord()->setMyLastAppliedOpTime(opTimeToWait); getReplCoord()->setMyLastDurableOpTime(opTimeToWait); - getReplCoord()->onSnapshotCreate(opTimeToWait, SnapshotName(1)); + getReplCoord()->createSnapshot(nullptr, opTimeToWait, SnapshotName(1)); }); auto txn = makeOperationContext(); @@ -3886,7 +3886,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet ASSERT_EQUALS(1, getReplCoord()->getTerm()); OpTime time(Timestamp(10, 0), 1); - getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); // higher OpTime, should change StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( @@ -4518,10 +4518,11 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); // ensure current snapshot follows price is right rules (closest but not greater than) getReplCoord()->setMyLastAppliedOpTime(time3); @@ -4551,10 +4552,11 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); // ensure current snapshot will not advance beyond existing snapshots getReplCoord()->setMyLastAppliedOpTime(time6); @@ -4582,17 +4584,18 @@ TEST_F(ReplCoordTest, OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); getReplCoord()->setMyLastAppliedOpTime(time6); getReplCoord()->setMyLastDurableOpTime(time6); ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // ensure current snapshot updates on new snapshot if we are that far - getReplCoord()->onSnapshotCreate(time6, SnapshotName(4)); + getReplCoord()->createSnapshot(txn.get(), time6, SnapshotName(4)); ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } @@ -4615,10 +4618,11 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); - getReplCoord()->onSnapshotCreate(time2, SnapshotName(2)); - getReplCoord()->onSnapshotCreate(time5, SnapshotName(3)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time2, SnapshotName(2)); + getReplCoord()->createSnapshot(txn.get(), time5, SnapshotName(3)); // ensure dropping all snapshots should reset the current committed snapshot getReplCoord()->dropAllSnapshots(); @@ -4640,8 +4644,9 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) { OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); + auto txn = makeOperationContext(); - getReplCoord()->onSnapshotCreate(time1, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), time1, SnapshotName(1)); getReplCoord()->setMyLastAppliedOpTime(time1); ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); @@ -4800,11 +4805,12 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { OpTime optime(Timestamp(100, 2), 0); getReplCoord()->setMyLastAppliedOpTime(optime); getReplCoord()->setMyLastDurableOpTime(optime); + auto txn = makeOperationContext(); // Set last committed optime via metadata. rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); getReplCoord()->processReplSetMetadata(syncSourceMetadata, true); - getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); + getReplCoord()->createSnapshot(txn.get(), optime, SnapshotName(1)); BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle)); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index ee2c819bc4d..9f80859a4b4 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -429,7 +429,9 @@ SnapshotName ReplicationCoordinatorMock::reserveSnapshotName(OperationContext* t void ReplicationCoordinatorMock::forceSnapshotCreation() {} -void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) {} +void ReplicationCoordinatorMock::createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name){}; void ReplicationCoordinatorMock::dropAllSnapshots() {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 67d6a555274..99a6a51fc81 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -241,7 +241,9 @@ public: virtual void forceSnapshotCreation() override; - virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name); + virtual void createSnapshot(OperationContext* txn, + OpTime timeOfSnapshot, + SnapshotName name) override; virtual void dropAllSnapshots() override; |