diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml | 4 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml | 4 | ||||
-rw-r--r-- | jstests/core/batch_write_command_delete.js | 3 | ||||
-rw-r--r-- | jstests/core/set_param1.js | 4 | ||||
-rw-r--r-- | src/mongo/db/operation_time_tracker.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/operation_time_tracker.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.h | 79 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 18 | ||||
-rw-r--r-- | src/mongo/shell/assert.js | 2 |
13 files changed, 283 insertions, 23 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml index fc09e03a452..5010c83f19d 100644 --- a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml @@ -66,10 +66,6 @@ selector: - jstests/core/ns_length.js - jstests/core/views/*.js # Views tests aren't expected to work when collections are implicitly sharded. - jstests/core/killop_drop_collection.js # Uses fsyncLock. - # TODO: SERVER-28107 - - jstests/core/batch_write_command_delete.js - - jstests/core/batch_write_command_insert.js - - jstests/core/batch_write_command_update.js # TODO: Remove after fixing SERVER-28321. - jstests/core/mr_undef.js # TODO: SERVER-16605 diff --git a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml index fda211b6fff..9791e68b1ad 100644 --- a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml @@ -48,10 +48,6 @@ selector: # TODO: SERVER-27269: mongos can't establish cursor if view has $collStats and views another view. - jstests/core/views/views_coll_stats.js - jstests/core/killop_drop_collection.js # Uses fsyncLock. - # TODO: SERVER-28107 - - jstests/core/batch_write_command_delete.js - - jstests/core/batch_write_command_insert.js - - jstests/core/batch_write_command_update.js executor: diff --git a/jstests/core/batch_write_command_delete.js b/jstests/core/batch_write_command_delete.js index 7a85bc6491b..c9813e66282 100644 --- a/jstests/core/batch_write_command_delete.js +++ b/jstests/core/batch_write_command_delete.js @@ -227,7 +227,8 @@ result = coll.runCommand(request); assert.commandWorked(result); assert.eq(1, coll.count()); -assert.hasFields(result, fields, 'fields in result do not match: ' + tojson(fields)); +// the error path does not go to the shard and hence there is no operationTime. +assert.hasFields(result, ["ok"], 'fields in result do not match: ' + tojson(fields)); // // When limit is not 0 and 1 diff --git a/jstests/core/set_param1.js b/jstests/core/set_param1.js index 51b13ae87cc..88d4f49b5ad 100644 --- a/jstests/core/set_param1.js +++ b/jstests/core/set_param1.js @@ -2,9 +2,13 @@ // and shell helpers. old = db.adminCommand({"getParameter": "*"}); +// the first time getParameter sends a request to with a shardingTaskExecutor and this sets an +// operationTime. The following commands do not use shardingTaskExecutor. +delete old["operationTime"]; tmp1 = db.adminCommand({"setParameter": 1, "logLevel": 5}); tmp2 = db.adminCommand({"setParameter": 1, "logLevel": old.logLevel}); now = db.adminCommand({"getParameter": "*"}); +delete now["operationTime"]; assert.eq(old, now, "A"); assert.eq(old.logLevel, tmp1.was, "B"); diff --git a/src/mongo/db/operation_time_tracker.cpp b/src/mongo/db/operation_time_tracker.cpp index 57a17f2ccae..71397a3e757 100644 --- a/src/mongo/db/operation_time_tracker.cpp +++ b/src/mongo/db/operation_time_tracker.cpp @@ -32,9 +32,21 @@ #include "mongo/stdx/mutex.h" namespace mongo { +namespace { +auto getOperationTimeTracker = + OperationContext::declareDecoration<std::shared_ptr<OperationTimeTracker>>(); +} + -const OperationContext::Decoration<OperationTimeTracker> OperationTimeTracker::get = - OperationContext::declareDecoration<OperationTimeTracker>(); +std::shared_ptr<OperationTimeTracker> OperationTimeTracker::get(OperationContext* opCtx) { + return getOperationTimeTracker(opCtx); +} + +void OperationTimeTracker::set(OperationContext* opCtx, + std::shared_ptr<OperationTimeTracker> trackerArg) { + auto& tracker = getOperationTimeTracker(opCtx); + tracker = std::move(trackerArg); +} LogicalTime OperationTimeTracker::getMaxOperationTime() const { stdx::lock_guard<stdx::mutex> lock(_mutex); diff --git a/src/mongo/db/operation_time_tracker.h b/src/mongo/db/operation_time_tracker.h index 812368b4a58..39157cc1a6e 100644 --- a/src/mongo/db/operation_time_tracker.h +++ b/src/mongo/db/operation_time_tracker.h @@ -42,7 +42,8 @@ namespace mongo { class OperationTimeTracker { public: // Decorate OperationContext with OperationTimeTracker instance. - static const OperationContext::Decoration<OperationTimeTracker> get; + static std::shared_ptr<OperationTimeTracker> get(OperationContext* ctx); + static void set(OperationContext* opCtx, std::shared_ptr<OperationTimeTracker> tracker); /* * Return the latest operationTime. diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index e4ec6233845..f2d6de5347a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -32,6 +32,17 @@ env.Library( ) env.Library( + target='sharding_task_executor', + source=[ + 'sharding_task_executor.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + ], +) + +env.Library( target='migration_types', source=[ 'migration_session_id.cpp', @@ -95,6 +106,7 @@ env.Library( '$BUILD_DIR/mongo/util/elapsed_tracker', 'metadata', 'migration_types', + 'sharding_task_executor', 'type_shard_identity', #'$BUILD_DIR/mongo/db/ops/write_ops', # CYCLE #'$BUILD_DIR/mongo/s/catalog/sharding_catalog_manager_impl', # CYCLE diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp new file mode 100644 index 00000000000..3c02c467e61 --- /dev/null +++ b/src/mongo/db/s/sharding_task_executor.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_task_executor.h" + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/logical_time.h" +#include "mongo/db/operation_time_tracker.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace executor { + +namespace { +const std::string kOperationTimeField = "operationTime"; +} + +ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor) + : _executor(std::move(executor)) {} + +void ShardingTaskExecutor::startup() { + _executor->startup(); +} + +void ShardingTaskExecutor::shutdown() { + _executor->shutdown(); +} + +void ShardingTaskExecutor::join() { + _executor->join(); +} + +std::string ShardingTaskExecutor::getDiagnosticString() const { + return _executor->getDiagnosticString(); +} + +Date_t ShardingTaskExecutor::now() { + return _executor->now(); +} + +StatusWith<TaskExecutor::EventHandle> ShardingTaskExecutor::makeEvent() { + return _executor->makeEvent(); +} + +void ShardingTaskExecutor::signalEvent(const EventHandle& event) { + return _executor->signalEvent(event); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event, + const CallbackFn& work) { + return _executor->onEvent(event, work); +} + +void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { + _executor->waitForEvent(event); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork( + const CallbackFn& work) { + return _executor->scheduleWork(work); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt( + Date_t when, const CallbackFn& work) { + return _executor->scheduleWorkAt(when, work); +} + +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand( + const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + + // schedule the user's callback if there is not opCtx + if (!request.opCtx) { + return _executor->scheduleRemoteCommand(request, cb); + } + + std::shared_ptr<OperationTimeTracker> timeTracker = OperationTimeTracker::get(request.opCtx); + if (!timeTracker) { // install the time tracker on the opCtx + timeTracker = std::make_shared<OperationTimeTracker>(); + OperationTimeTracker::set(request.opCtx, timeTracker); + } + + auto shardingCb = [timeTracker, cb](const TaskExecutor::RemoteCommandCallbackArgs& args) { + cb(args); + + invariant(timeTracker); + + if (!args.response.isOK()) { + LOG(1) << "Error processing the remote request" + << "do not update operationTime"; + return; + } + + auto operationTime = args.response.data[kOperationTimeField]; + if (operationTime.eoo()) { + LOG(1) << "No operationTime in the response"; + return; + } + + invariant(operationTime.type() == BSONType::bsonTimestamp); + + timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp())); + }; + + return _executor->scheduleRemoteCommand(request, shardingCb); +} + +void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { + _executor->cancel(cbHandle); +} + +void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) { + _executor->wait(cbHandle); +} + +void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { + _executor->appendConnectionStats(stats); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/db/s/sharding_task_executor.h b/src/mongo/db/s/sharding_task_executor.h new file mode 100644 index 00000000000..7828b56dd94 --- /dev/null +++ b/src/mongo/db/s/sharding_task_executor.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { +namespace executor { + +struct ConnectionPoolStats; +class ThreadPoolTaskExecutor; + +/** + * Implementation of a TaskExecutor that uses ThreadPoolTaskExecutor to submit tasks and allows to + * override methods if needed. + */ +class ShardingTaskExecutor final : public TaskExecutor { + MONGO_DISALLOW_COPYING(ShardingTaskExecutor); + +public: + ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor); + + void startup() override; + void shutdown() override; + void join() override; + std::string getDiagnosticString() const override; + Date_t now() override; + StatusWith<EventHandle> makeEvent() override; + void signalEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override; + void waitForEvent(const EventHandle& event) override; + StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override; + void cancel(const CallbackHandle& cbHandle) override; + void wait(const CallbackHandle& cbHandle) override; + + void appendConnectionStats(ConnectionPoolStats* stats) const override; + +private: + std::unique_ptr<ThreadPoolTaskExecutor> _executor; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index eede0298c6d..c3b69f7bd08 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -30,6 +30,7 @@ env.Library( '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl', '$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager', + '$BUILD_DIR/mongo/db/s/sharding_task_executor', 'client/sharding_connection_hook', 'coreshard', ], diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index b0b51806c3e..b586acfc57d 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -46,6 +46,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_time_tracker.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" @@ -82,6 +83,8 @@ using std::stringstream; namespace { +const std::string kOperationTime = "operationTime"; + void runAgainstRegistered(OperationContext* opCtx, const char* ns, BSONObj& jsobj, @@ -167,6 +170,11 @@ Status processCommandMetadata(OperationContext* opCtx, const BSONObj& cmdObj) { void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* responseBuilder) { rpc::LogicalTimeMetadata logicalTimeMetadata(LogicalClock::get(opCtx)->getClusterTime()); logicalTimeMetadata.writeToMetadata(responseBuilder); + auto tracker = OperationTimeTracker::get(opCtx); + if (tracker) { + auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime(); + responseBuilder->append(kOperationTime, operationTime.asTimestamp()); + } } MONGO_INITIALIZER(InitializeCommandExecCommandHandler)(InitializerContext* const) { diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index d6fe68b2b73..8234e4ea6ed 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/audit.h" +#include "mongo/db/s/sharding_task_executor.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -89,13 +90,15 @@ using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; +using executor::ShardingTaskExecutor; static constexpr auto kRetryInterval = Seconds{2}; -std::unique_ptr<ThreadPoolTaskExecutor> makeTaskExecutor(std::unique_ptr<NetworkInterface> net) { +auto makeTaskExecutor(std::unique_ptr<NetworkInterface> net) { auto netPtr = net.get(); - return stdx::make_unique<ThreadPoolTaskExecutor>( + auto executor = stdx::make_unique<ThreadPoolTaskExecutor>( stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + return stdx::make_unique<ShardingTaskExecutor>(std::move(executor)); } std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service, @@ -119,22 +122,17 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( std::vector<std::unique_ptr<executor::TaskExecutor>> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { - auto net = executor::makeNetworkInterface( + auto exec = makeTaskExecutor(executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique<ShardingNetworkConnectionHook>(), metadataHookBuilder(), - connPoolOptions); - auto netPtr = net.get(); - auto exec = stdx::make_unique<ThreadPoolTaskExecutor>( - stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + connPoolOptions)); executors.emplace_back(std::move(exec)); } // Add executor used to perform non-performance critical work. - auto fixedNetPtr = fixedNet.get(); - auto fixedExec = stdx::make_unique<ThreadPoolTaskExecutor>( - stdx::make_unique<NetworkInterfaceThreadPool>(fixedNetPtr), std::move(fixedNet)); + auto fixedExec = makeTaskExecutor(std::move(fixedNet)); auto executorPool = stdx::make_unique<TaskExecutorPool>(); executorPool->addExecutors(std::move(executors), std::move(fixedExec)); diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js index 5c8dafff4a5..b26380fdb18 100644 --- a/src/mongo/shell/assert.js +++ b/src/mongo/shell/assert.js @@ -144,7 +144,7 @@ assert.hasFields = function(result, arr, msg) { } if (count != arr.length) { - doassert("None of values from " + tojson(arr) + " was in " + tojson(o) + " : " + msg); + doassert("None of values from " + tojson(arr) + " was in " + tojson(result) + " : " + msg); } }; |