diff options
author | Misha Tyulenev <misha@mongodb.com> | 2017-03-21 16:23:35 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2017-03-22 12:03:06 -0400 |
commit | 4d364a4c951bb05639335d5989c1f85e79af78fa (patch) | |
tree | 487535da7f765f7d37bae861049fe96e48c89320 /src | |
parent | b38e0199d04a6b4516566be71c819e612801c365 (diff) | |
download | mongo-4d364a4c951bb05639335d5989c1f85e79af78fa.tar.gz |
SERVER-28107 augment command result with operationTime in mongos
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/operation_time_tracker.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/operation_time_tracker.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.h | 79 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 18 | ||||
-rw-r--r-- | src/mongo/shell/assert.js | 2 |
9 files changed, 277 insertions, 14 deletions
diff --git a/src/mongo/db/operation_time_tracker.cpp b/src/mongo/db/operation_time_tracker.cpp index 57a17f2ccae..71397a3e757 100644 --- a/src/mongo/db/operation_time_tracker.cpp +++ b/src/mongo/db/operation_time_tracker.cpp @@ -32,9 +32,21 @@ #include "mongo/stdx/mutex.h" namespace mongo { +namespace { +auto getOperationTimeTracker = + OperationContext::declareDecoration<std::shared_ptr<OperationTimeTracker>>(); +} + -const OperationContext::Decoration<OperationTimeTracker> OperationTimeTracker::get = - OperationContext::declareDecoration<OperationTimeTracker>(); +std::shared_ptr<OperationTimeTracker> OperationTimeTracker::get(OperationContext* opCtx) { + return getOperationTimeTracker(opCtx); +} + +void OperationTimeTracker::set(OperationContext* opCtx, + std::shared_ptr<OperationTimeTracker> trackerArg) { + auto& tracker = getOperationTimeTracker(opCtx); + tracker = std::move(trackerArg); +} LogicalTime OperationTimeTracker::getMaxOperationTime() const { stdx::lock_guard<stdx::mutex> lock(_mutex); diff --git a/src/mongo/db/operation_time_tracker.h b/src/mongo/db/operation_time_tracker.h index 812368b4a58..39157cc1a6e 100644 --- a/src/mongo/db/operation_time_tracker.h +++ b/src/mongo/db/operation_time_tracker.h @@ -42,7 +42,8 @@ namespace mongo { class OperationTimeTracker { public: // Decorate OperationContext with OperationTimeTracker instance. - static const OperationContext::Decoration<OperationTimeTracker> get; + static std::shared_ptr<OperationTimeTracker> get(OperationContext* ctx); + static void set(OperationContext* opCtx, std::shared_ptr<OperationTimeTracker> tracker); /* * Return the latest operationTime. diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index e4ec6233845..f2d6de5347a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -32,6 +32,17 @@ env.Library( ) env.Library( + target='sharding_task_executor', + source=[ + 'sharding_task_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + ], +) + +env.Library( target='migration_types', source=[ 'migration_session_id.cpp', @@ -95,6 +106,7 @@ env.Library( '$BUILD_DIR/mongo/util/elapsed_tracker', 'metadata', 'migration_types', + 'sharding_task_executor', 'type_shard_identity', #'$BUILD_DIR/mongo/db/ops/write_ops', # CYCLE #'$BUILD_DIR/mongo/s/catalog/sharding_catalog_manager_impl', # CYCLE diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp new file mode 100644 index 00000000000..3c02c467e61 --- /dev/null +++ b/src/mongo/db/s/sharding_task_executor.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_task_executor.h" + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/logical_time.h" +#include "mongo/db/operation_time_tracker.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace executor { + +namespace { +const std::string kOperationTimeField = "operationTime"; +} + +ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor) + : _executor(std::move(executor)) {} + +void ShardingTaskExecutor::startup() { + _executor->startup(); +} + +void ShardingTaskExecutor::shutdown() { + _executor->shutdown(); +} + +void ShardingTaskExecutor::join() { + _executor->join(); +} + +std::string ShardingTaskExecutor::getDiagnosticString() const { + return _executor->getDiagnosticString(); +} + +Date_t ShardingTaskExecutor::now() { + return _executor->now(); +} + +StatusWith<TaskExecutor::EventHandle> ShardingTaskExecutor::makeEvent() { + return _executor->makeEvent(); +} + +void ShardingTaskExecutor::signalEvent(const EventHandle& event) { + return _executor->signalEvent(event); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event, + const CallbackFn& work) { + return _executor->onEvent(event, work); +} + +void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { + _executor->waitForEvent(event); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork( + const CallbackFn& work) { + return _executor->scheduleWork(work); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt( + Date_t when, const CallbackFn& work) { + return _executor->scheduleWorkAt(when, work); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( + const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + + // schedule the user's callback if there is not opCtx + if (!request.opCtx) { + return _executor->scheduleRemoteCommand(request, cb); + } + + std::shared_ptr<OperationTimeTracker> timeTracker = OperationTimeTracker::get(request.opCtx); + if (!timeTracker) { // install the time tracker on the opCtx + timeTracker = std::make_shared<OperationTimeTracker>(); + OperationTimeTracker::set(request.opCtx, timeTracker); + } + + auto shardingCb = [timeTracker, cb](const TaskExecutor::RemoteCommandCallbackArgs& args) { + cb(args); + + invariant(timeTracker); + + if (!args.response.isOK()) { + LOG(1) << "Error processing the remote request" + << "do not update operationTime"; + return; + } + + auto operationTime = args.response.data[kOperationTimeField]; + if (operationTime.eoo()) { + LOG(1) << "No operationTime in the response"; + return; + } + + invariant(operationTime.type() == BSONType::bsonTimestamp); + + timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp())); + }; + + return _executor->scheduleRemoteCommand(request, shardingCb); +} + +void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { + _executor->cancel(cbHandle); +} + +void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) { + _executor->wait(cbHandle); +} + +void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { + _executor->appendConnectionStats(stats); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/db/s/sharding_task_executor.h b/src/mongo/db/s/sharding_task_executor.h new file mode 100644 index 00000000000..7828b56dd94 --- /dev/null +++ b/src/mongo/db/s/sharding_task_executor.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { +namespace executor { + +struct ConnectionPoolStats; +class ThreadPoolTaskExecutor; + +/** + * Implementation of a TaskExecutor that uses ThreadPoolTaskExecutor to submit tasks and allows to + * override methods if needed. + */ +class ShardingTaskExecutor final : public TaskExecutor { + MONGO_DISALLOW_COPYING(ShardingTaskExecutor); + +public: + ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor); + + void startup() override; + void shutdown() override; + void join() override; + std::string getDiagnosticString() const override; + Date_t now() override; + StatusWith<EventHandle> makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle) override; + + void appendConnectionStats(ConnectionPoolStats* stats) const override; + +private: + std::unique_ptr<ThreadPoolTaskExecutor> _executor; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index eede0298c6d..c3b69f7bd08 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -30,6 +30,7 @@ env.Library( '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl', '$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager', + '$BUILD_DIR/mongo/db/s/sharding_task_executor', 'client/sharding_connection_hook', 'coreshard', ], diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index b0b51806c3e..b586acfc57d 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -46,6 +46,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_time_tracker.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" @@ -82,6 +83,8 @@ using std::stringstream; namespace { +const std::string kOperationTime = "operationTime"; + void runAgainstRegistered(OperationContext* opCtx, const char* ns, BSONObj& jsobj, @@ -167,6 +170,11 @@ Status processCommandMetadata(OperationContext* opCtx, const BSONObj& cmdObj) { void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* responseBuilder) { rpc::LogicalTimeMetadata logicalTimeMetadata(LogicalClock::get(opCtx)->getClusterTime()); logicalTimeMetadata.writeToMetadata(responseBuilder); + auto tracker = OperationTimeTracker::get(opCtx); + if (tracker) { + auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime(); + responseBuilder->append(kOperationTime, operationTime.asTimestamp()); + } } MONGO_INITIALIZER(InitializeCommandExecCommandHandler)(InitializerContext* const) { diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index d6fe68b2b73..8234e4ea6ed 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/audit.h" +#include "mongo/db/s/sharding_task_executor.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -89,13 +90,15 @@ using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; +using executor::ShardingTaskExecutor; static constexpr auto kRetryInterval = Seconds{2}; -std::unique_ptr<ThreadPoolTaskExecutor> makeTaskExecutor(std::unique_ptr<NetworkInterface> net) { +auto makeTaskExecutor(std::unique_ptr<NetworkInterface> net) { auto netPtr = net.get(); - return stdx::make_unique<ThreadPoolTaskExecutor>( + auto executor = stdx::make_unique<ThreadPoolTaskExecutor>( stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + return stdx::make_unique<ShardingTaskExecutor>(std::move(executor)); } std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service, @@ -119,22 +122,17 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( std::vector<std::unique_ptr<executor::TaskExecutor>> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { - auto net = executor::makeNetworkInterface( + auto exec = makeTaskExecutor(executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique<ShardingNetworkConnectionHook>(), metadataHookBuilder(), - connPoolOptions); - auto netPtr = net.get(); - auto exec = stdx::make_unique<ThreadPoolTaskExecutor>( - stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + connPoolOptions)); executors.emplace_back(std::move(exec)); } // Add executor used to perform non-performance critical work. - auto fixedNetPtr = fixedNet.get(); - auto fixedExec = stdx::make_unique<ThreadPoolTaskExecutor>( - stdx::make_unique<NetworkInterfaceThreadPool>(fixedNetPtr), std::move(fixedNet)); + auto fixedExec = makeTaskExecutor(std::move(fixedNet)); auto executorPool = stdx::make_unique<TaskExecutorPool>(); executorPool->addExecutors(std::move(executors), std::move(fixedExec)); diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js index 5c8dafff4a5..b26380fdb18 100644 --- a/src/mongo/shell/assert.js +++ b/src/mongo/shell/assert.js @@ -144,7 +144,7 @@ assert.hasFields = function(result, arr, msg) { } if (count != arr.length) { - doassert("None of values from " + tojson(arr) + " was in " + tojson(o) + " : " + msg); + doassert("None of values from " + tojson(arr) + " was in " + tojson(result) + " : " + msg); } }; |