diff options
author | Randolph Tan <randolph@10gen.com> | 2017-02-23 14:57:14 -0500 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-03-07 17:12:01 -0500 |
commit | 43be7d92a003f075a5130b2eb38175a062d0729d (patch) | |
tree | bb751ca50470551a85fd2f36b85d21f22d7820e8 | |
parent | 07c73edf683342d91ba941dd2733ff9e9e6fa6c0 (diff) | |
download | mongo-43be7d92a003f075a5130b2eb38175a062d0729d.tar.gz |
SERVER-27749 Integrate LogicalTimeMetadata
Attach logical time metadata to all outgoing messages and process incoming logical time metadata in mongod
30 files changed, 308 insertions, 73 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 24da3df6e83..2adcef690ee 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -354,6 +354,7 @@ env.Install( LIBDEPS=[ 'db/commands/core', 'db/conn_pool_options', + 'db/logical_time_metadata_hook', 'db/mongodandmongos', 'db/server_options', 'db/stats/counters', diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index ab81deafa1f..d4f42533e31 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -829,6 +829,7 @@ env.Library( ], LIBDEPS=[ "$BUILD_DIR/mongo/db/bson/dotted_path_support", + '$BUILD_DIR/mongo/db/logical_time_metadata_hook', "$BUILD_DIR/mongo/executor/network_interface_factory", "$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl", "$BUILD_DIR/mongo/s/catalog/sharding_catalog_manager_impl", @@ -992,6 +993,17 @@ env.CppUnitTest( ) env.Library( + target= 'logical_time_metadata_hook', + source= [ + 'logical_time_metadata_hook.cpp', + ], + LIBDEPS= [ + 'signed_logical_time', + '$BUILD_DIR/mongo/rpc/metadata', + ], +) + +env.Library( target= 'logical_clock_test_fixture', source= [ 'logical_clock_test_fixture.cpp', diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt index 3c560ccf9b8..fc1de1b8f9f 100644 --- a/src/mongo/db/auth/action_types.txt +++ b/src/mongo/db/auth/action_types.txt @@ -5,6 +5,7 @@ # This means that the integer value assigned to each ActionType and used internally in ActionSet # also may change between versions. ["addShard", +"advanceLogicalTime", "anyAction", # Special ActionType that represents *all* actions "appendOplogNote", "applicationMessage", diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index 7b4879425a3..0b83bdf4ad6 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -96,6 +96,7 @@ #include "mongo/db/write_concern.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/logical_time_metadata.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" @@ -1300,9 +1301,9 @@ const std::array<StringData, 4> neededFieldNames{QueryRequest::cmdOptionMaxTimeM QueryRequest::queryOptionMaxTimeMS}; } // namespace -void appendOpTimeMetadata(OperationContext* opCtx, - const rpc::RequestInterface& request, - BSONObjBuilder* metadataBob) { +void appendReplyMetadata(OperationContext* opCtx, + const rpc::RequestInterface& request, + BSONObjBuilder* metadataBob) { const bool isShardingAware = ShardingState::get(opCtx)->enabled(); const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); @@ -1320,6 +1321,9 @@ void appendOpTimeMetadata(OperationContext* opCtx, rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) .writeToMetadata(metadataBob); } + + rpc::LogicalTimeMetadata logicalTimeMetadata(LogicalClock::get(opCtx)->getClusterTime()); + logicalTimeMetadata.writeToMetadata(metadataBob); } // If we're a shard other than the config shard, attach the last configOpTime we know about. @@ -1479,7 +1483,7 @@ bool Command::run(OperationContext* opCtx, inPlaceReplyBob.doneFast(); BSONObjBuilder metadataBob; - appendOpTimeMetadata(opCtx, request, &metadataBob); + appendReplyMetadata(opCtx, request, &metadataBob); replyBuilder->setMetadata(metadataBob.done()); return result; @@ -1651,7 +1655,7 @@ void mongo::execCommandDatabase(OperationContext* opCtx, } BSONObjBuilder metadataBob; - appendOpTimeMetadata(opCtx, request, &metadataBob); + appendReplyMetadata(opCtx, request, &metadataBob); auto operationTime = _getClientOperationTime(opCtx); Command::generateErrorResponse( diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 1e63c1b55bc..987a4c38679 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -75,6 +75,7 @@ #include "mongo/db/json.h" #include "mongo/db/log_process_details.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/mongod_options.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/operation_context.h" @@ -898,16 +899,17 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, TimeProofService::Key key(std::move(tempKey)); auto timeProofService = stdx::make_unique<TimeProofService>(std::move(key)); auto logicalClock = - stdx::make_unique<LogicalClock>(serviceContext, std::move(timeProofService), false); + stdx::make_unique<LogicalClock>(serviceContext, std::move(timeProofService)); LogicalClock::set(serviceContext, std::move(logicalClock)); auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - // TODO SERVER-27750: add LogicalTimeMetadataHook + hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorImpl>( serviceContext, getGlobalReplSettings(), - stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(storageInterface), + stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(serviceContext, + storageInterface), executor::makeNetworkInterface( "NetworkInterfaceASIO-Replication", nullptr, std::move(hookList)), stdx::make_unique<repl::TopologyCoordinatorImpl>(topoCoordOptions), diff --git a/src/mongo/db/logical_clock.cpp b/src/mongo/db/logical_clock.cpp index 6aeef259528..5e4150467a5 100644 --- a/src/mongo/db/logical_clock.cpp +++ b/src/mongo/db/logical_clock.cpp @@ -57,10 +57,8 @@ void LogicalClock::set(ServiceContext* service, std::unique_ptr<LogicalClock> cl clock = std::move(clockArg); } -LogicalClock::LogicalClock(ServiceContext* service, - std::unique_ptr<TimeProofService> tps, - bool validateProof) - : _service(service), _timeProofService(std::move(tps)), _validateProof(validateProof) {} +LogicalClock::LogicalClock(ServiceContext* service, std::unique_ptr<TimeProofService> tps) + : _service(service), _timeProofService(std::move(tps)) {} SignedLogicalTime LogicalClock::getClusterTime() { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -72,16 +70,24 @@ SignedLogicalTime LogicalClock::_makeSignedLogicalTime(LogicalTime logicalTime) } Status LogicalClock::advanceClusterTime(const SignedLogicalTime& newTime) { - if (_validateProof) { - invariant(_timeProofService); - auto res = _timeProofService->checkProof(newTime.getTime(), newTime.getProof()); - if (res != Status::OK()) { - return res; - } + const auto& newLogicalTime = newTime.getTime(); + + // No need to check proof if no time was given. + if (newLogicalTime == LogicalTime::kUninitialized) { + return Status::OK(); + } + + invariant(_timeProofService); + auto res = _timeProofService->checkProof(newLogicalTime, newTime.getProof()); + if (res != Status::OK()) { + return res; } stdx::lock_guard<stdx::mutex> lock(_mutex); - // TODO: rate check per SERVER-27721 + return _advanceClusterTime_inlock(newTime); +} + +Status LogicalClock::_advanceClusterTime_inlock(SignedLogicalTime newTime) { if (newTime.getTime() > _clusterTime.getTime()) { _clusterTime = newTime; } @@ -89,14 +95,16 @@ Status LogicalClock::advanceClusterTime(const SignedLogicalTime& newTime) { return Status::OK(); } -Status LogicalClock::advanceClusterTimeFromTrustedSource(LogicalTime newTime) { +Status LogicalClock::advanceClusterTimeFromTrustedSource(SignedLogicalTime newTime) { stdx::lock_guard<stdx::mutex> lock(_mutex); - // TODO: rate check per SERVER-27721 - if (newTime > _clusterTime.getTime()) { - _clusterTime = _makeSignedLogicalTime(newTime); - } + return _advanceClusterTime_inlock(std::move(newTime)); +} - return Status::OK(); +Status LogicalClock::signAndAdvanceClusterTime(LogicalTime newTime) { + auto newSignedTime = _makeSignedLogicalTime(newTime); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _advanceClusterTime_inlock(std::move(newSignedTime)); } LogicalTime LogicalClock::reserveTicks(uint64_t ticks) { diff --git a/src/mongo/db/logical_clock.h b/src/mongo/db/logical_clock.h index 5c04207e324..0369eb9a641 100644 --- a/src/mongo/db/logical_clock.h +++ b/src/mongo/db/logical_clock.h @@ -52,11 +52,8 @@ public: /** * Creates an instance of LogicalClock. The TimeProofService must already be fully initialized. - * The validateProof indicates if the advanceClusterTime validates newTime. It should do so - * only when LogicalClock installed on mongos and the auth is on. When the auth is off we - * assume that the DBA uses other ways to validate authenticity of user messages. */ - LogicalClock(ServiceContext*, std::unique_ptr<TimeProofService>, bool validateProof); + LogicalClock(ServiceContext*, std::unique_ptr<TimeProofService>); /** * The method sets clusterTime to the newTime if the newTime > _clusterTime and the newTime @@ -67,9 +64,16 @@ public: Status advanceClusterTime(const SignedLogicalTime&); /** - * Simliar to advaneClusterTime, but only does rate checking and not proof validation. + * Similar to advaneClusterTime, but only does rate checking and not proof validation. */ - Status advanceClusterTimeFromTrustedSource(LogicalTime); + Status advanceClusterTimeFromTrustedSource(SignedLogicalTime newTime); + + /** + * Similar to advanceClusterTimeFromTrustedSource, but also signs the new time. Note that this + * should only be used on trusted LogicalTime (for example, LogicalTime extracted from local + * oplog entry). + */ + Status signAndAdvanceClusterTime(LogicalTime newTime); /** * Returns the current clusterTime. @@ -94,13 +98,14 @@ private: */ SignedLogicalTime _makeSignedLogicalTime(LogicalTime); + Status _advanceClusterTime_inlock(SignedLogicalTime newTime); + ServiceContext* const _service; std::unique_ptr<TimeProofService> _timeProofService; // the mutex protects _clusterTime stdx::mutex _mutex; SignedLogicalTime _clusterTime; - const bool _validateProof; }; } // namespace mongo diff --git a/src/mongo/db/logical_clock_test.cpp b/src/mongo/db/logical_clock_test.cpp index e3e75acd9dc..bb8bdba9afc 100644 --- a/src/mongo/db/logical_clock_test.cpp +++ b/src/mongo/db/logical_clock_test.cpp @@ -50,7 +50,7 @@ protected: TimeProofService::Key key(std::move(tempKey)); auto pTps = stdx::make_unique<TimeProofService>(std::move(key)); _timeProofService = pTps.get(); - _clock = stdx::make_unique<LogicalClock>(_serviceContext.get(), std::move(pTps), true); + _clock = stdx::make_unique<LogicalClock>(_serviceContext.get(), std::move(pTps)); } void tearDown() { @@ -82,7 +82,7 @@ TEST_F(LogicalClockTestBase, roundtrip) { auto pTps = stdx::make_unique<TimeProofService>(std::move(key)); auto time = LogicalTime(tX); - LogicalClock logicalClock(&serviceContext, std::move(pTps), true); + LogicalClock logicalClock(&serviceContext, std::move(pTps)); logicalClock.initClusterTimeFromTrustedSource(time); auto storedTime(logicalClock.getClusterTime()); @@ -113,7 +113,7 @@ TEST_F(LogicalClockTestBase, advanceClusterTime) { auto t1 = getClock()->reserveTicks(1); t1.addTicks(100); SignedLogicalTime l1 = makeSignedLogicalTime(t1); - ASSERT_OK(getClock()->advanceClusterTime(l1)); + ASSERT_OK(getClock()->advanceClusterTimeFromTrustedSource(l1)); auto l2(getClock()->getClusterTime()); ASSERT_TRUE(l1.getTime() == l2.getTime()); } diff --git a/src/mongo/db/logical_clock_test_fixture.cpp b/src/mongo/db/logical_clock_test_fixture.cpp index b0001ebab90..618da867ee1 100644 --- a/src/mongo/db/logical_clock_test_fixture.cpp +++ b/src/mongo/db/logical_clock_test_fixture.cpp @@ -43,8 +43,7 @@ void LogicalClockTest::setUp() { std::array<std::uint8_t, 20> tempKey = {}; TimeProofService::Key key(std::move(tempKey)); auto timeProofService = stdx::make_unique<TimeProofService>(std::move(key)); - auto logicalClock = - stdx::make_unique<LogicalClock>(service, std::move(timeProofService), false); + auto logicalClock = stdx::make_unique<LogicalClock>(service, std::move(timeProofService)); LogicalClock::set(service, std::move(logicalClock)); } diff --git a/src/mongo/db/logical_time_metadata_hook.cpp b/src/mongo/db/logical_time_metadata_hook.cpp new file mode 100644 index 00000000000..70acba1de5c --- /dev/null +++ b/src/mongo/db/logical_time_metadata_hook.cpp @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/logical_time_metadata_hook.h" + +#include "mongo/db/logical_clock.h" +#include "mongo/rpc/metadata/logical_time_metadata.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +namespace rpc { + +LogicalTimeMetadataHook::LogicalTimeMetadataHook(ServiceContext* service) : _service(service) {} + +Status LogicalTimeMetadataHook::writeRequestMetadata(OperationContext* opCtx, + const HostAndPort& requestDestination, + BSONObjBuilder* metadataBob) { + LogicalTimeMetadata metadata(LogicalClock::get(_service)->getClusterTime()); + metadata.writeToMetadata(metadataBob); + return Status::OK(); +} + +Status LogicalTimeMetadataHook::readReplyMetadata(const HostAndPort& replySource, + const BSONObj& metadataObj) { + auto parseStatus = LogicalTimeMetadata::readFromMetadata(metadataObj); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + auto& signedTime = parseStatus.getValue().getSignedTime(); + return LogicalClock::get(_service)->advanceClusterTimeFromTrustedSource(signedTime); +} + +} // namespace rpc +} // namespace mongo diff --git a/src/mongo/db/logical_time_metadata_hook.h b/src/mongo/db/logical_time_metadata_hook.h new file mode 100644 index 00000000000..3fbd2260dd1 --- /dev/null +++ b/src/mongo/db/logical_time_metadata_hook.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/rpc/metadata/metadata_hook.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +class BSONObj; +class BSONObjBuilder; +struct HostAndPort; +class OperationContext; +class ServiceContext; +class Status; + +namespace rpc { + +class LogicalTimeMetadataHook : public EgressMetadataHook { +public: + explicit LogicalTimeMetadataHook(ServiceContext* service); + + Status writeRequestMetadata(OperationContext* opCtx, + const HostAndPort& requestDestination, + BSONObjBuilder* metadataBob) override; + + Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override; + +private: + ServiceContext* _service; +}; + +} // namespace rpc +} // namespace mongo diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index e686f13e471..3d5547ed526 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1117,7 +1117,7 @@ Status applyCommand_inlock(OperationContext* opCtx, void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { stdx::lock_guard<stdx::mutex> lk(newOpMutex); - LogicalClock::get(service)->advanceClusterTimeFromTrustedSource(LogicalTime(newTime)); + LogicalClock::get(service)->signAndAdvanceClusterTime(LogicalTime(newTime)); lastSetTimestamp = newTime; newTimestampNotifier.notify_all(); } diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index f3a0cd748e2..2001f60a3a8 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -356,7 +356,8 @@ public: result.append("info2", noConfigMessage); log() << "initiate : " << noConfigMessage; - ReplicationCoordinatorExternalStateImpl externalState(StorageInterface::get(opCtx)); + ReplicationCoordinatorExternalStateImpl externalState(opCtx->getServiceContext(), + StorageInterface::get(opCtx)); std::string name; std::vector<HostAndPort> seeds; parseReplSetSeedList(&externalState, replSetString, &name, &seeds); // may throw... diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 36d8a84fd06..04a2fcf88f8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -46,6 +46,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/op_observer.h" #include "mongo/db/repair_database.h" #include "mongo/db/repl/bgsync.h" @@ -187,8 +188,9 @@ std::unique_ptr<ThreadPool> makeThreadPool() { } // namespace ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl( - StorageInterface* storageInterface) - : _storageInterface(storageInterface), + ServiceContext* service, StorageInterface* storageInterface) + : _service(service), + _storageInterface(storageInterface), _initialSyncThreadPool(OldThreadPool::DoNotStartThreadsTag(), 1, "initial sync-"), _initialSyncRunner(&_initialSyncThreadPool) { uassert(ErrorCodes::BadValue, "A StorageInterface is required.", _storageInterface); @@ -299,14 +301,14 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) { log() << "Starting replication snapshot thread"; - _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); + _snapshotThread = SnapshotThread::start(_service); } log() << "Starting replication storage threads"; - getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this); + _service->getGlobalStorageEngine()->setJournalListener(this); auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - // TODO SERVER-27750: add LogicalTimeMetadataHook + hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(_service)); _taskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>( makeThreadPool(), executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList))); @@ -365,7 +367,7 @@ Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationCont try { ScopedTransaction scopedXact(opCtx, MODE_X); Lock::GlobalWrite globalWrite(opCtx->lockState()); - StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine(); + StorageEngine* engine = _service->getGlobalStorageEngine(); if (!engine->isMmapV1()) { return Status::OK(); @@ -395,7 +397,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati Helpers::putSingleton(opCtx, configCollectionName, config); const auto msgObj = BSON("msg" << "initiating set"); - getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, msgObj); + _service->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "initiate oplog entry", "local.oplog.rs"); @@ -717,7 +719,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); + _service->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) { @@ -727,10 +729,10 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - Balancer::get(getGlobalServiceContext())->interruptBalancer(); + Balancer::get(_service)->interruptBalancer(); } - ShardingState::get(getGlobalServiceContext())->markCollectionsNotShardedAtStepdown(); + ShardingState::get(_service)->markCollectionsNotShardedAtStepdown(); } void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook( @@ -829,7 +831,7 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* opCtx) { std::vector<std::string> dbNames; - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + StorageEngine* storageEngine = _service->getGlobalStorageEngine(); storageEngine->listDatabases(&dbNames); for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) { @@ -848,19 +850,19 @@ void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationC } void ReplicationCoordinatorExternalStateImpl::dropAllSnapshots() { - if (auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager()) + if (auto manager = _service->getGlobalStorageEngine()->getSnapshotManager()) manager->dropAllSnapshots(); } void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotName newCommitPoint) { - auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); + auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); invariant(manager); // This should never be called if there is no SnapshotManager. manager->setCommittedSnapshot(newCommitPoint); } void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx, SnapshotName name) { - auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); + auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); invariant(manager); // This should never be called if there is no SnapshotManager. manager->createSnapshot(opCtx, name); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 16c5adf0be6..debadbb02c0 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -60,7 +60,8 @@ class ReplicationCoordinatorExternalStateImpl final : public ReplicationCoordina MONGO_DISALLOW_COPYING(ReplicationCoordinatorExternalStateImpl); public: - ReplicationCoordinatorExternalStateImpl(StorageInterface* storageInterface); + ReplicationCoordinatorExternalStateImpl(ServiceContext* service, + StorageInterface* storageInterface); virtual ~ReplicationCoordinatorExternalStateImpl(); virtual void startThreads(const ReplSettings& settings) override; virtual void startInitialSync(OnInitialSyncFinishedFn finished) override; @@ -148,6 +149,8 @@ private: */ void _dropAllTempCollections(OperationContext* opCtx); + ServiceContext* _service; + // Guards starting threads and setting _startedThreads stdx::mutex _threadMutex; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 945455d4512..93c3a564a93 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -125,8 +125,7 @@ void ReplCoordTest::init() { std::array<std::uint8_t, 20> tempKey = {}; TimeProofService::Key key(std::move(tempKey)); auto timeProofService = stdx::make_unique<TimeProofService>(std::move(key)); - auto logicalClock = - stdx::make_unique<LogicalClock>(service, std::move(timeProofService), false); + auto logicalClock = stdx::make_unique<LogicalClock>(service, std::move(timeProofService)); LogicalClock::set(service, std::move(logicalClock)); TopologyCoordinatorImpl::Options settings; diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index b81af5a051a..b329a325423 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -36,6 +36,8 @@ #include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_impl.h" +#include "mongo/db/logical_time_metadata_hook.h" +#include "mongo/db/operation_context.h" #include "mongo/db/s/sharding_egress_metadata_hook_for_mongod.h" #include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" @@ -86,9 +88,10 @@ Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, configCS, distLockProcessId, std::move(shardFactory), - [] { + [opCtx] { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - // TODO SERVER-27750: add LogicalTimeMetadataHook + hookList->addHook( + stdx::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext())); hookList->addHook(stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>()); return hookList; }, diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index 194011842ee..7c24ce66ff5 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -58,7 +58,7 @@ void ServiceContextMongoDTest::setUp() { TimeProofService::Key key(std::move(tempKey)); auto timeProofService = stdx::make_unique<TimeProofService>(std::move(key)); auto logicalClock = - stdx::make_unique<LogicalClock>(serviceContext, std::move(timeProofService), false); + stdx::make_unique<LogicalClock>(serviceContext, std::move(timeProofService)); LogicalClock::set(serviceContext, std::move(logicalClock)); if (!serviceContext->getGlobalStorageEngine()) { diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index bad5265acce..38dcf788f09 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -123,6 +123,7 @@ dbtest = env.Program( "$BUILD_DIR/mongo/db/repl/replmocks", "$BUILD_DIR/mongo/db/serveronly", "$BUILD_DIR/mongo/db/logical_clock", + "$BUILD_DIR/mongo/db/logical_time_metadata_hook", "$BUILD_DIR/mongo/db/storage/mmap_v1/paths", "$BUILD_DIR/mongo/util/net/network", "$BUILD_DIR/mongo/util/progress_meter", diff --git a/src/mongo/dbtests/dbtests.cpp b/src/mongo/dbtests/dbtests.cpp index 586ecb3ea13..1ad9c0f8d2d 100644 --- a/src/mongo/dbtests/dbtests.cpp +++ b/src/mongo/dbtests/dbtests.cpp @@ -135,8 +135,7 @@ int dbtestsMain(int argc, char** argv, char** envp) { std::array<std::uint8_t, 20> tempKey = {}; TimeProofService::Key key(std::move(tempKey)); auto timeProofService = stdx::make_unique<TimeProofService>(std::move(key)); - auto logicalClock = - stdx::make_unique<LogicalClock>(service, std::move(timeProofService), false); + auto logicalClock = stdx::make_unique<LogicalClock>(service, std::move(timeProofService)); LogicalClock::set(service, std::move(logicalClock)); repl::setGlobalReplicationCoordinator( diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp index b11f0111be1..c808305cb30 100644 --- a/src/mongo/dbtests/replica_set_tests.cpp +++ b/src/mongo/dbtests/replica_set_tests.cpp @@ -48,8 +48,8 @@ protected: void setUp() { auto opCtx = makeOpCtx(); _storageInterface = stdx::make_unique<repl::StorageInterfaceMock>(); - _replCoordExternalState.reset( - new repl::ReplicationCoordinatorExternalStateImpl(_storageInterface.get())); + _replCoordExternalState.reset(new repl::ReplicationCoordinatorExternalStateImpl( + opCtx->getServiceContext(), _storageInterface.get())); } void tearDown() { diff --git a/src/mongo/rpc/SConscript b/src/mongo/rpc/SConscript index a4945dcf9c6..558458be062 100644 --- a/src/mongo/rpc/SConscript +++ b/src/mongo/rpc/SConscript @@ -162,7 +162,9 @@ env.Clone().InjectModule("enterprise").Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/db/auth/authcore', '$BUILD_DIR/mongo/db/repl/optime', + '$BUILD_DIR/mongo/db/logical_clock', '$BUILD_DIR/mongo/db/signed_logical_time', '$BUILD_DIR/mongo/util/decorable', ], diff --git a/src/mongo/rpc/metadata.cpp b/src/mongo/rpc/metadata.cpp index 00d3c01ae78..5957d7a55b7 100644 --- a/src/mongo/rpc/metadata.cpp +++ b/src/mongo/rpc/metadata.cpp @@ -30,11 +30,18 @@ #include "mongo/rpc/metadata.h" +#include "mongo/base/init.h" #include "mongo/client/dbclientinterface.h" +#include "mongo/db/auth/action_set.h" +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/privilege.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_clock.h" #include "mongo/rpc/metadata/audit_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/logical_time_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/rpc/metadata/tracking_metadata.h" @@ -42,6 +49,27 @@ namespace mongo { namespace rpc { +namespace { + +std::vector<Privilege> advanceLogicalClockPrivilege; + +MONGO_INITIALIZER(InitializeAdvanceLogicalClockPrivilegeVector)(InitializerContext* const) { + ActionSet actions; + actions.addAction(ActionType::internal); + advanceLogicalClockPrivilege.emplace_back(ResourcePattern::forClusterResource(), actions); + return Status::OK(); +} + +bool isAuthorizedToAdvanceClock(OperationContext* opCtx) { + auto client = opCtx->getClient(); + // Note: returns true if auth is off, courtesy of + // AuthzSessionExternalStateServerCommon::shouldIgnoreAuthChecks. + return AuthorizationSession::get(client)->isAuthorizedForPrivileges( + advanceLogicalClockPrivilege); +} + +} // unnamed namespace + BSONObj makeEmptyMetadata() { return BSONObj(); } @@ -52,6 +80,7 @@ Status readRequestMetadata(OperationContext* opCtx, const BSONObj& metadataObj) BSONElement configSvrElem; BSONElement trackingElem; BSONElement clientElem; + BSONElement logicalTimeElem; for (const auto& metadataElem : metadataObj) { auto fieldName = metadataElem.fieldNameStringData(); @@ -65,6 +94,8 @@ Status readRequestMetadata(OperationContext* opCtx, const BSONObj& metadataObj) clientElem = metadataElem; } else if (fieldName == TrackingMetadata::fieldName()) { trackingElem = metadataElem; + } else if (fieldName == LogicalTimeMetadata::fieldName()) { + logicalTimeElem = metadataElem; } } @@ -98,6 +129,30 @@ Status readRequestMetadata(OperationContext* opCtx, const BSONObj& metadataObj) } TrackingMetadata::get(opCtx) = std::move(trackingMetadata.getValue()); + auto logicalClock = LogicalClock::get(opCtx); + if (logicalClock) { + auto logicalTimeMetadata = LogicalTimeMetadata::readFromMetadata(logicalTimeElem); + if (!logicalTimeMetadata.isOK()) { + return logicalTimeMetadata.getStatus(); + } + + if (isAuthorizedToAdvanceClock(opCtx)) { + auto advanceClockStatus = logicalClock->advanceClusterTimeFromTrustedSource( + logicalTimeMetadata.getValue().getSignedTime()); + + if (!advanceClockStatus.isOK()) { + return advanceClockStatus; + } + } else { + auto advanceClockStatus = + logicalClock->advanceClusterTime(logicalTimeMetadata.getValue().getSignedTime()); + + if (!advanceClockStatus.isOK()) { + return advanceClockStatus; + } + } + } + return Status::OK(); } diff --git a/src/mongo/rpc/metadata/logical_time_metadata.cpp b/src/mongo/rpc/metadata/logical_time_metadata.cpp index ef056a2fd46..9d79a892e3a 100644 --- a/src/mongo/rpc/metadata/logical_time_metadata.cpp +++ b/src/mongo/rpc/metadata/logical_time_metadata.cpp @@ -45,12 +45,17 @@ const char kSignatureFieldName[] = "signature"; LogicalTimeMetadata::LogicalTimeMetadata(SignedLogicalTime time) : _clusterTime(std::move(time)) {} + StatusWith<LogicalTimeMetadata> LogicalTimeMetadata::readFromMetadata(const BSONObj& metadata) { return readFromMetadata(metadata.getField(fieldName())); } StatusWith<LogicalTimeMetadata> LogicalTimeMetadata::readFromMetadata( const BSONElement& metadataElem) { + if (metadataElem.eoo()) { + return LogicalTimeMetadata(); + } + const auto& obj = metadataElem.Obj(); Timestamp ts; diff --git a/src/mongo/rpc/metadata/logical_time_metadata.h b/src/mongo/rpc/metadata/logical_time_metadata.h index aafd4640231..2ff792b58b1 100644 --- a/src/mongo/rpc/metadata/logical_time_metadata.h +++ b/src/mongo/rpc/metadata/logical_time_metadata.h @@ -47,8 +47,13 @@ namespace rpc { */ class LogicalTimeMetadata { public: + LogicalTimeMetadata() = default; explicit LogicalTimeMetadata(SignedLogicalTime time); + /** + * Parses the metadata from BSON. Returns an empty LogicalTimeMetadata If the metadata is not + * present. + */ static StatusWith<LogicalTimeMetadata> readFromMetadata(const BSONObj& metadata); static StatusWith<LogicalTimeMetadata> readFromMetadata(const BSONElement& metadataElem); diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 158afab4e44..8fddfcbb6db 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -20,6 +20,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/authcore', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/lasterror', + '$BUILD_DIR/mongo/db/logical_time_metadata_hook', '$BUILD_DIR/mongo/executor/connection_pool_stats', '$BUILD_DIR/mongo/executor/task_executor_pool', '$BUILD_DIR/mongo/rpc/metadata', diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 7cadf2ed8d0..623801dd756 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -39,6 +39,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" #include "mongo/executor/network_connection_hook.h" @@ -188,12 +189,12 @@ void ShardRegistry::init() { _initConfigServerCS = ConnectionString(); } -void ShardRegistry::startup() { +void ShardRegistry::startup(OperationContext* opCtx) { // startup() must be called only once invariant(!_executor); auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - // TODO SERVER-27750: add LogicalTimeMetadataHook + hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext())); // construct task executor auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList)); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 92e31156e88..37ad27ba5b5 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -47,6 +47,7 @@ class BSONObjBuilder; struct HostAndPort; class NamespaceString; class OperationContext; +class ServiceContext; class ShardFactory; class Shard; class ShardType; @@ -152,7 +153,7 @@ public: /** * Starts ReplicaSetMonitor by adding a config shard. */ - void startup(); + void startup(OperationContext* opCtx); /** * This is invalid to use on the config server and will hit an invariant if it is done. diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 41ad5b8ce7b..bd177748fcf 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -55,6 +55,7 @@ #include "mongo/db/lasterror.h" #include "mongo/db/log_process_details.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" @@ -202,9 +203,10 @@ static Status initializeSharding(OperationContext* opCtx) { mongosGlobalParams.configdbs, generateDistLockProcessId(opCtx), std::move(shardFactory), - []() { + [opCtx]() { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - // TODO SERVER-27750: add LogicalTimeMetadataHook + hookList->addHook( + stdx::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext())); hookList->addHook(stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>()); return hookList; }, @@ -280,10 +282,8 @@ static ExitCode runMongosServer() { std::array<std::uint8_t, 20> tempKey = {}; TimeProofService::Key key(std::move(tempKey)); auto timeProofService = stdx::make_unique<TimeProofService>(std::move(key)); - auto logicalClock = stdx::make_unique<LogicalClock>( - opCtx->getServiceContext(), - std::move(timeProofService), - serverGlobalParams.authState == ServerGlobalParams::AuthState::kEnabled); + auto logicalClock = + stdx::make_unique<LogicalClock>(opCtx->getServiceContext(), std::move(timeProofService)); LogicalClock::set(opCtx->getServiceContext(), std::move(logicalClock)); { diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index db41e736fc6..d6fe68b2b73 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -209,7 +209,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, networkPtr); // must be started once the grid is initialized - grid.shardRegistry()->startup(); + grid.shardRegistry()->startup(opCtx); auto status = rawCatalogClient->startup(); if (!status.isOK()) { |