diff options
Diffstat (limited to 'src/mongo/db')
33 files changed, 111 insertions, 77 deletions
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 194684bdc2b..87127a06dd7 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -759,7 +759,7 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) { b.appendElements(o); BSONObj bo = b.obj(); - StatusWith<BSONObj> res = fixDocumentForInsert(bo); + StatusWith<BSONObj> res = fixDocumentForInsert(_txn->getServiceContext(), bo); uassertStatusOK(res.getStatus()); if (!res.getValue().isEmpty()) { bo = res.getValue(); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 3dc0a9ec791..89b4360de91 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -893,6 +893,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, topoCoordOptions.clusterRole = serverGlobalParams.clusterRole; auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorImpl>( + serviceContext, getGlobalReplSettings(), stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(storageInterface), executor::makeNetworkInterface("NetworkInterfaceASIO-Replication"), diff --git a/src/mongo/db/global_timestamp.cpp b/src/mongo/db/global_timestamp.cpp index 95f26618779..0dbb6c2cf6b 100644 --- a/src/mongo/db/global_timestamp.cpp +++ b/src/mongo/db/global_timestamp.cpp @@ -43,7 +43,8 @@ namespace { AtomicUInt64 globalTimestamp(0); } // namespace -void setGlobalTimestamp(const Timestamp& newTime) { +void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) { + // TODO: SERVER-27746 replace with LogicalTime globalTimestamp.store(newTime.asULL() + 1); } @@ -51,7 +52,8 @@ Timestamp getLastSetTimestamp() { return Timestamp(globalTimestamp.load() - 1); } -Timestamp getNextGlobalTimestamp(unsigned count) { +Timestamp getNextGlobalTimestamp(ServiceContext* service, unsigned count) { + // TODO: SERVER-27746 replace with LogicalTime const unsigned now = durationCount<Seconds>( getGlobalServiceContext()->getFastClockSource()->now().toDurationSinceEpoch()); invariant(now != 0); // This is a sentinel value for null Timestamps. diff --git a/src/mongo/db/global_timestamp.h b/src/mongo/db/global_timestamp.h index 9c106147b6a..408d38a6aae 100644 --- a/src/mongo/db/global_timestamp.h +++ b/src/mongo/db/global_timestamp.h @@ -31,7 +31,9 @@ #include "mongo/bson/timestamp.h" namespace mongo { -void setGlobalTimestamp(const Timestamp& newTime); +class ServiceContext; + +void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); /** * Returns the value of the global Timestamp generated last time or set. @@ -42,5 +44,5 @@ Timestamp getLastSetTimestamp(); * Generates a new and unique Timestamp. * If count > 1 that many unique Timestamps are reserved starting with the returned value. */ -Timestamp getNextGlobalTimestamp(unsigned count = 1); +Timestamp getNextGlobalTimestamp(ServiceContext* service, unsigned count = 1); } diff --git a/src/mongo/db/logical_clock.cpp b/src/mongo/db/logical_clock.cpp index c712941dee5..ad61792045b 100644 --- a/src/mongo/db/logical_clock.cpp +++ b/src/mongo/db/logical_clock.cpp @@ -53,12 +53,10 @@ void LogicalClock::set(ServiceContext* service, std::unique_ptr<LogicalClock> cl clock = std::move(clockArg); } -LogicalClock::LogicalClock(ServiceContext* serviceContext, +LogicalClock::LogicalClock(ServiceContext* service, std::unique_ptr<TimeProofService> tps, bool validateProof) - : _serviceContext(serviceContext), - _timeProofService(std::move(tps)), - _validateProof(validateProof) {} + : _service(service), _timeProofService(std::move(tps)), _validateProof(validateProof) {} SignedLogicalTime LogicalClock::getClusterTime() { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -94,7 +92,7 @@ LogicalTime LogicalClock::reserveTicks(uint64_t ticks) { stdx::lock_guard<stdx::mutex> lock(_mutex); const unsigned wallClockSecs = - durationCount<Seconds>(_serviceContext->getFastClockSource()->now().toDurationSinceEpoch()); + durationCount<Seconds>(_service->getFastClockSource()->now().toDurationSinceEpoch()); unsigned currentSecs = _clusterTime.getTime().asTimestamp().getSecs(); LogicalTime clusterTimestamp = _clusterTime.getTime(); diff --git a/src/mongo/db/logical_clock.h b/src/mongo/db/logical_clock.h index 3119b75f225..57d8dc4b80b 100644 --- a/src/mongo/db/logical_clock.h +++ b/src/mongo/db/logical_clock.h @@ -88,7 +88,7 @@ private: */ SignedLogicalTime _makeSignedLogicalTime(LogicalTime); - ServiceContext* _serviceContext; + ServiceContext* const _service; std::unique_ptr<TimeProofService> _timeProofService; // the mutex protects _clusterTime diff --git a/src/mongo/db/ops/insert.cpp b/src/mongo/db/ops/insert.cpp index cfc9b10f35f..3cddceac1d5 100644 --- a/src/mongo/db/ops/insert.cpp +++ b/src/mongo/db/ops/insert.cpp @@ -39,7 +39,7 @@ using std::string; using namespace mongoutils; -StatusWith<BSONObj> fixDocumentForInsert(const BSONObj& doc) { +StatusWith<BSONObj> fixDocumentForInsert(ServiceContext* service, const BSONObj& doc) { if (doc.objsize() > BSONObjMaxUserSize) return StatusWith<BSONObj>(ErrorCodes::BadValue, str::stream() << "object to insert too large" @@ -124,7 +124,7 @@ StatusWith<BSONObj> fixDocumentForInsert(const BSONObj& doc) { if (hadId && e.fieldNameStringData() == "_id") { // no-op } else if (e.type() == bsonTimestamp && e.timestampValue() == 0) { - b.append(e.fieldName(), getNextGlobalTimestamp()); + b.append(e.fieldName(), getNextGlobalTimestamp(service)); } else { b.append(e); } diff --git a/src/mongo/db/ops/insert.h b/src/mongo/db/ops/insert.h index e0204c290c7..52233313571 100644 --- a/src/mongo/db/ops/insert.h +++ b/src/mongo/db/ops/insert.h @@ -33,11 +33,13 @@ namespace mongo { +class ServiceContext; + /** * if doc is ok, then return is BSONObj() * otherwise, BSONObj is what should be inserted instead */ -StatusWith<BSONObj> fixDocumentForInsert(const BSONObj& doc); +StatusWith<BSONObj> fixDocumentForInsert(ServiceContext* service, const BSONObj& doc); /** diff --git a/src/mongo/db/ops/modifier_current_date.cpp b/src/mongo/db/ops/modifier_current_date.cpp index cd328f5fe94..f693292781d 100644 --- a/src/mongo/db/ops/modifier_current_date.cpp +++ b/src/mongo/db/ops/modifier_current_date.cpp @@ -34,6 +34,7 @@ #include "mongo/db/ops/field_checker.h" #include "mongo/db/ops/log_builder.h" #include "mongo/db/ops/path_support.h" +#include "mongo/db/service_context.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -224,7 +225,8 @@ Status ModifierCurrentDate::apply() const { if (!s.isOK()) return s; } else { - Status s = elemToSet.setValueTimestamp(getNextGlobalTimestamp()); + ServiceContext* service = getGlobalServiceContext(); + Status s = elemToSet.setValueTimestamp(getNextGlobalTimestamp(service)); if (!s.isOK()) return s; } diff --git a/src/mongo/db/ops/modifier_object_replace.cpp b/src/mongo/db/ops/modifier_object_replace.cpp index b33b1d76615..4dd1565dbdb 100644 --- a/src/mongo/db/ops/modifier_object_replace.cpp +++ b/src/mongo/db/ops/modifier_object_replace.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/mutable/document.h" #include "mongo/db/global_timestamp.h" #include "mongo/db/ops/log_builder.h" +#include "mongo/db/service_context.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -55,7 +56,8 @@ Status fixupTimestamps(const BSONObj& obj) { unsigned long long timestamp = timestampView.read<unsigned long long>(); if (timestamp == 0) { // performance note, this locks a mutex: - Timestamp ts(getNextGlobalTimestamp()); + ServiceContext* service = getGlobalServiceContext(); + Timestamp ts(getNextGlobalTimestamp(service)); timestampView.write(tagLittleEndian(ts.asULL())); } } diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index a2a7e74167b..98f32c928ac 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -445,7 +445,7 @@ WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) { for (auto&& doc : wholeOp.documents) { const bool isLastDoc = (&doc == &wholeOp.documents.back()); - auto fixedDoc = fixDocumentForInsert(doc); + auto fixedDoc = fixDocumentForInsert(txn->getServiceContext(), doc); if (!fixedDoc.isOK()) { // Handled after we insert anything in the batch to be sure we report errors in the // correct order. In an ordered insert, if one of the docs ahead of us fails, we should diff --git a/src/mongo/db/range_deleter_test.cpp b/src/mongo/db/range_deleter_test.cpp index 4e73f16d65b..501994e3495 100644 --- a/src/mongo/db/range_deleter_test.cpp +++ b/src/mongo/db/range_deleter_test.cpp @@ -79,7 +79,8 @@ private: setGlobalServiceContext(stdx::make_unique<ServiceContextNoop>()); mongo::repl::ReplicationCoordinator::set( getServiceContext(), - stdx::make_unique<mongo::repl::ReplicationCoordinatorMock>(replSettings)); + stdx::make_unique<mongo::repl::ReplicationCoordinatorMock>(getServiceContext(), + replSettings)); _client = getServiceContext()->makeClient("RangeDeleterTest"); _txn = _client->makeOperationContext(); deleter.startWorkers(); diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index 7c138a6f96c..3c010185e64 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -111,7 +111,7 @@ void InitialSync::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTim const OpTime lastOpTime = multiApply(txn, ops.releaseBatch()); replCoord->setMyLastAppliedOpTime(lastOpTime); - setNewTimestamp(lastOpTime.getTimestamp()); + setNewTimestamp(txn->getServiceContext(), lastOpTime.getTimestamp()); if (globalInShutdownDeprecated()) { return; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 9c8f03d7c2f..0e9b8c0366c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -161,7 +161,7 @@ void getNextOpTime(OperationContext* txn, } stdx::lock_guard<stdx::mutex> lk(newOpMutex); - Timestamp ts = getNextGlobalTimestamp(count); + Timestamp ts = getNextGlobalTimestamp(txn->getServiceContext(), count); newTimestampNotifier.notify_all(); fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); @@ -1107,9 +1107,9 @@ Status applyCommand_inlock(OperationContext* txn, return Status::OK(); } -void setNewTimestamp(const Timestamp& newTime) { +void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { stdx::lock_guard<stdx::mutex> lk(newOpMutex); - setGlobalTimestamp(newTime); + setGlobalTimestamp(service, newTime); newTimestampNotifier.notify_all(); } @@ -1120,7 +1120,7 @@ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { if (!lastOp.isEmpty()) { LOG(1) << "replSet setting last Timestamp"; const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromOplogEntry(lastOp)); - setNewTimestamp(opTime.getTimestamp()); + setNewTimestamp(txn->getServiceContext(), opTime.getTimestamp()); } } @@ -1172,8 +1172,8 @@ bool SnapshotThread::shouldSleepMore(int numSleepsDone, size_t numUncommittedSna void SnapshotThread::run() { Client::initThread("SnapshotThread"); auto& client = cc(); - auto serviceContext = client.getServiceContext(); - auto replCoord = ReplicationCoordinator::get(serviceContext); + auto service = client.getServiceContext(); + auto replCoord = ReplicationCoordinator::get(service); Timestamp lastTimestamp = {}; while (true) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index b2078e93d28..dc61805e789 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -133,7 +133,7 @@ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS); /** * Sets the global Timestamp to be 'newTime'. */ -void setNewTimestamp(const Timestamp& newTime); +void setNewTimestamp(ServiceContext* txn, const Timestamp& newTime); /** * Detects the current replication mode and sets the "_oplogCollectionName" accordingly. diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index e3f335603d9..ced9ab1f495 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -69,18 +69,18 @@ private: void OplogBufferCollectionTest::setUp() { ServiceContextMongoDTest::setUp(); - auto serviceContext = getServiceContext(); + auto service = getServiceContext(); // AutoGetCollectionForRead requires a valid replication coordinator in order to check the shard // version. ReplSettings replSettings; replSettings.setOplogSizeBytes(5 * 1024 * 1024); - ReplicationCoordinator::set(serviceContext, - stdx::make_unique<ReplicationCoordinatorMock>(replSettings)); + ReplicationCoordinator::set( + service, stdx::make_unique<ReplicationCoordinatorMock>(service, replSettings)); auto storageInterface = stdx::make_unique<StorageInterfaceImpl>(); _storageInterface = storageInterface.get(); - StorageInterface::set(serviceContext, std::move(storageInterface)); + StorageInterface::set(service, std::move(storageInterface)); _txn = makeOperationContext(); } diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index b60b63ac880..a9d55d42259 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -800,6 +800,8 @@ public: virtual Status stepUpIfEligible() = 0; + virtual ServiceContext* getServiceContext() = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 67fb9f1c509..c995f30a468 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -183,7 +183,7 @@ public: /** * Returns true if "host" is one of the network identities of this node. */ - virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx) = 0; + virtual bool isSelf(const HostAndPort& host, ServiceContext* service) = 0; /** * Gets the replica set config document from local storage, or returns an error. @@ -208,7 +208,7 @@ public: /** * Sets the global opTime to be 'newTime'. */ - virtual void setGlobalTimestamp(const Timestamp& newTime) = 0; + virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0; /** * Gets the last optime of an operation performed on this host, from stable diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 9a0dea2381a..660bd9407d2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -573,8 +573,9 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( } } -void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { - setNewTimestamp(newTime); +void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext* ctx, + const Timestamp& newTime) { + setNewTimestamp(ctx, newTime); } void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 0cb5ecca6e5..2bee57d3419 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -81,12 +81,12 @@ public: OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) override; virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext* txn); - virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx); + virtual bool isSelf(const HostAndPort& host, ServiceContext* service); virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config); virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* txn); virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote); - virtual void setGlobalTimestamp(const Timestamp& newTime); + virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn); virtual void cleanUpLastApplyBatch(OperationContext* txn); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 614136431fd..5f305c067d6 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -110,7 +110,7 @@ OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) { } bool ReplicationCoordinatorExternalStateMock::isSelf(const HostAndPort& host, - ServiceContext* const ctx) { + ServiceContext* const service) { return sequenceContains(_selfHosts, host); } @@ -178,7 +178,8 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( _localRsLastVoteDocument = localLastVoteDocument; } -void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(const Timestamp& newTime) {} +void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service, + const Timestamp& newTime) {} void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* txn) {} diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 0c75d8df7fc..d35c269a332 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -74,13 +74,13 @@ public: OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) override; virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext*); - virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx); + virtual bool isSelf(const HostAndPort& host, ServiceContext* service); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config); virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* txn); virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote); - virtual void setGlobalTimestamp(const Timestamp& newTime); + virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn); virtual void cleanUpLastApplyBatch(OperationContext* txn); virtual void closeConnections(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 38c403853ce..0716601d6e7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -288,7 +288,7 @@ DataReplicatorOptions createDataReplicatorOptions( options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); }; options.setMyLastOptime = [replCoord, externalState](const OpTime& opTime) { replCoord->setMyLastAppliedOpTime(opTime); - externalState->setGlobalTimestamp(opTime.getTimestamp()); + externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp()); }; options.getSlaveDelay = [replCoord]() { return replCoord->getSlaveDelaySecs(); }; options.syncSourceSelector = replCoord; @@ -306,13 +306,15 @@ std::string ReplicationCoordinatorImpl::SnapshotInfo::toString() const { } ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( + ServiceContext* service, const ReplSettings& settings, std::unique_ptr<ReplicationCoordinatorExternalState> externalState, std::unique_ptr<NetworkInterface> network, std::unique_ptr<TopologyCoordinator> topCoord, StorageInterface* storage, int64_t prngSeed) - : _settings(settings), + : _service(service), + _settings(settings), _replMode(getReplicationModeFromSettings(settings)), _topCoord(std::move(topCoord)), _replExecutor(std::move(network), prngSeed), @@ -326,6 +328,9 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.isSlave())), _canServeNonLocalReads(0U), _storage(storage) { + + invariant(_service); + if (!isReplEnabled()) { return; } @@ -466,8 +471,8 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( LockGuard topoLock(_topoMutex); - StatusWith<int> myIndex = validateConfigForStartUp( - _externalState.get(), _rsConfig, localConfig, getGlobalServiceContext()); + StatusWith<int> myIndex = + validateConfigForStartUp(_externalState.get(), _rsConfig, localConfig, getServiceContext()); if (!myIndex.isOK()) { if (myIndex.getStatus() == ErrorCodes::NodeNotFound || myIndex.getStatus() == ErrorCodes::DuplicateKey) { @@ -530,7 +535,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( lock.unlock(); } - _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); + _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp()); // Step down is impossible, so we don't need to wait for the returned event. _updateTerm_incallback(term); LOG(1) << "Current term is now " << term; @@ -2676,7 +2681,7 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } else { _electionId = OID::gen(); } - _topCoord->processWinElection(_electionId, getNextGlobalTimestamp()); + _topCoord->processWinElection(_electionId, getNextGlobalTimestamp(getServiceContext())); invariant(!_isCatchingUp); invariant(!_isWaitingForDrainToComplete); _isCatchingUp = true; @@ -2871,13 +2876,13 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& n // Downgrade invariant(newConfig.getProtocolVersion() == 0); _electionId = OID::gen(); - _topCoord->setElectionInfo(_electionId, getNextGlobalTimestamp()); + _topCoord->setElectionInfo(_electionId, getNextGlobalTimestamp(getServiceContext())); } else if (oldConfig.getProtocolVersion() < newConfig.getProtocolVersion()) { // Upgrade invariant(newConfig.getProtocolVersion() == 1); invariant(_topCoord->getTerm() != OpTime::kUninitializedTerm); _electionId = OID::fromTerm(_topCoord->getTerm()); - _topCoord->setElectionInfo(_electionId, getNextGlobalTimestamp()); + _topCoord->setElectionInfo(_electionId, getNextGlobalTimestamp(getServiceContext())); } } @@ -3125,7 +3130,7 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn _reportUpstream_inlock(std::move(lock)); // Unlocked below. - _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); + _externalState->setGlobalTimestamp(txn->getServiceContext(), lastOpTime.getTimestamp()); } bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 7bd6fd872b2..c481b30cdc6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -89,7 +89,8 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator { MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); public: - ReplicationCoordinatorImpl(const ReplSettings& settings, + ReplicationCoordinatorImpl(ServiceContext* serviceContext, + const ReplSettings& settings, std::unique_ptr<ReplicationCoordinatorExternalState> externalState, std::unique_ptr<executor::NetworkInterface> network, std::unique_ptr<TopologyCoordinator> topoCoord, @@ -289,6 +290,11 @@ public: */ virtual long long getTerm() override; + // Returns the ServiceContext where this instance runs. + virtual ServiceContext* getServiceContext() override { + return _service; + } + virtual Status updateTerm(OperationContext* txn, long long term) override; virtual SnapshotName reserveSnapshotName(OperationContext* txn) override; @@ -1172,6 +1178,9 @@ private: // a node discovers that it is a member of a config. unordered_set<HostAndPort> _seedList; // (X) + // Back pointer to the ServiceContext that has started the instance. + ServiceContext* const _service; // (S) + // Parsed command line arguments related to replication. const ReplSettings _settings; // (R) diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 9fddd74ddab..2a4432c2590 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -44,8 +44,9 @@ namespace repl { using std::vector; -ReplicationCoordinatorMock::ReplicationCoordinatorMock(const ReplSettings& settings) - : _settings(settings) {} +ReplicationCoordinatorMock::ReplicationCoordinatorMock(ServiceContext* service, + const ReplSettings& settings) + : _service(service), _settings(settings) {} ReplicationCoordinatorMock::~ReplicationCoordinatorMock() {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 6aacf7929d9..9825883ecbf 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -50,7 +50,7 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator { MONGO_DISALLOW_COPYING(ReplicationCoordinatorMock); public: - ReplicationCoordinatorMock(const ReplSettings& settings); + ReplicationCoordinatorMock(ServiceContext* service, const ReplSettings& settings); virtual ~ReplicationCoordinatorMock(); virtual void startup(OperationContext* txn); @@ -273,8 +273,13 @@ public: */ void alwaysAllowWrites(bool allowWrites); + virtual ServiceContext* getServiceContext() override { + return _service; + } + private: AtomicUInt64 _snapshotNameGenerator; + ServiceContext* const _service; const ReplSettings _settings; MemberState _memberState; OpTime _myLastDurableOpTime; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index b77836029b4..61fd52855de 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -113,10 +113,10 @@ void ReplCoordTest::init() { invariant(!_repl); invariant(!_callShutdown); - auto serviceContext = getGlobalServiceContext(); + auto service = getGlobalServiceContext(); StorageInterface* storageInterface = new StorageInterfaceMock(); - StorageInterface::set(serviceContext, std::unique_ptr<StorageInterface>(storageInterface)); - ASSERT_TRUE(storageInterface == StorageInterface::get(serviceContext)); + StorageInterface::set(service, std::unique_ptr<StorageInterface>(storageInterface)); + ASSERT_TRUE(storageInterface == StorageInterface::get(service)); // PRNG seed for tests. const int64_t seed = 0; @@ -127,13 +127,13 @@ void ReplCoordTest::init() { _net = net.get(); auto externalState = stdx::make_unique<ReplicationCoordinatorExternalStateMock>(); _externalState = externalState.get(); - _repl = stdx::make_unique<ReplicationCoordinatorImpl>(_settings, + _repl = stdx::make_unique<ReplicationCoordinatorImpl>(service, + _settings, std::move(externalState), std::move(net), std::move(topo), storageInterface, seed); - auto service = getGlobalServiceContext(); service->setFastClockSource(stdx::make_unique<executor::NetworkInterfaceMockClockSource>(_net)); service->setPreciseClockSource( stdx::make_unique<executor::NetworkInterfaceMockClockSource>(_net)); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 508edcb1ade..5c3ec06be96 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -388,7 +388,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) { OpTime lastOptime = OplogEntry(lastOp).getOpTime(); ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOptime); replCoord->setMyLastAppliedOpTime(lastOptime); - setNewTimestamp(lastOptime.getTimestamp()); + setNewTimestamp(replCoord->getServiceContext(), lastOptime.getTimestamp()); std::string msg = "oplog sync 1 of 3"; log() << msg; diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 51da192da60..988fe98face 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -77,14 +77,11 @@ ReplSettings createReplSettings() { class ReplicationCoordinatorRollbackMock : public ReplicationCoordinatorMock { public: - ReplicationCoordinatorRollbackMock(); - void resetLastOpTimesFromOplog(OperationContext* txn) override; + ReplicationCoordinatorRollbackMock(ServiceContext* service) + : ReplicationCoordinatorMock(service, createReplSettings()) {} + void resetLastOpTimesFromOplog(OperationContext* txn) override {} }; -ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollbackMock() - : ReplicationCoordinatorMock(createReplSettings()) {} - -void ReplicationCoordinatorRollbackMock::resetLastOpTimesFromOplog(OperationContext* txn) {} class RollbackSourceMock : public RollbackSource { public: @@ -144,7 +141,7 @@ private: void RSRollbackTest::setUp() { ServiceContextMongoDTest::setUp(); _txn = cc().makeOperationContext(); - _coordinator = new ReplicationCoordinatorRollbackMock(); + _coordinator = new ReplicationCoordinatorRollbackMock(_txn->getServiceContext()); auto serviceContext = getServiceContext(); ReplicationCoordinator::set(serviceContext, diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index afff51fd10d..c44e281f012 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -179,8 +179,9 @@ private: ReplSettings settings; settings.setOplogSizeBytes(5 * 1024 * 1024); settings.setReplSetString("mySet/node1:12345"); - ReplicationCoordinator::set(getServiceContext(), - stdx::make_unique<ReplicationCoordinatorMock>(settings)); + ReplicationCoordinator::set( + getServiceContext(), + stdx::make_unique<ReplicationCoordinatorMock>(getServiceContext(), settings)); } }; @@ -189,7 +190,8 @@ protected: void setUp() override { ServiceContextMongoDTest::setUp(); createOptCtx(); - _coordinator = new ReplicationCoordinatorMock(createReplSettings()); + _coordinator = + new ReplicationCoordinatorMock(_txn->getServiceContext(), createReplSettings()); setGlobalReplicationCoordinator(_coordinator); } void tearDown() override { diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 3bba514f24b..a0302e3addb 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -822,9 +822,9 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { // Update various things that care about our last applied optime. Tests rely on 2 happening // before 3 even though it isn't strictly necessary. The order of 1 doesn't matter. - setNewTimestamp(lastOpTimeInBatch.getTimestamp()); // 1 - StorageInterface::get(&txn)->setAppliedThrough(&txn, lastOpTimeInBatch); // 2 - finalizer->record(lastOpTimeInBatch); // 3 + setNewTimestamp(txn.getServiceContext(), lastOpTimeInBatch.getTimestamp()); // 1 + StorageInterface::get(&txn)->setAppliedThrough(&txn, lastOpTimeInBatch); // 2 + finalizer->record(lastOpTimeInBatch); // 3 } } diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 90fc9ee21b6..634682cb401 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -112,15 +112,15 @@ void SyncTailTest::setUp() { replSettings.setOplogSizeBytes(5 * 1024 * 1024); replSettings.setReplSetString("repl"); - auto serviceContext = getServiceContext(); - ReplicationCoordinator::set(serviceContext, - stdx::make_unique<ReplicationCoordinatorMock>(replSettings)); + auto service = getServiceContext(); + ReplicationCoordinator::set( + service, stdx::make_unique<ReplicationCoordinatorMock>(service, replSettings)); auto storageInterface = stdx::make_unique<StorageInterfaceMock>(); _storageInterface = storageInterface.get(); storageInterface->insertDocumentsFn = [](OperationContext*, const NamespaceString&, const std::vector<BSONObj>&) { return Status::OK(); }; - StorageInterface::set(serviceContext, std::move(storageInterface)); + StorageInterface::set(service, std::move(storageInterface)); _txn = cc().makeOperationContext(); _opsApplied = 0; diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp index 0df5477127e..f00a345caca 100644 --- a/src/mongo/db/s/collection_range_deleter_test.cpp +++ b/src/mongo/db/s/collection_range_deleter_test.cpp @@ -67,7 +67,8 @@ private: void CollectionRangeDeleterTest::setUp() { ServiceContextMongoDTest::setUp(); const repl::ReplSettings replSettings = {}; - repl::setGlobalReplicationCoordinator(new repl::ReplicationCoordinatorMock(replSettings)); + repl::setGlobalReplicationCoordinator( + new repl::ReplicationCoordinatorMock(getServiceContext(), replSettings)); repl::getGlobalReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY); _opCtx = getServiceContext()->makeOperationContext(&cc()); _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext()); |