summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2017-03-21 16:23:35 -0400
committerMisha Tyulenev <misha@mongodb.com>2017-03-22 12:03:06 -0400
commit4d364a4c951bb05639335d5989c1f85e79af78fa (patch)
tree487535da7f765f7d37bae861049fe96e48c89320
parentb38e0199d04a6b4516566be71c819e612801c365 (diff)
downloadmongo-4d364a4c951bb05639335d5989c1f85e79af78fa.tar.gz
SERVER-28107 augment command result with operationTime in mongos
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml4
-rw-r--r--jstests/core/batch_write_command_delete.js3
-rw-r--r--jstests/core/set_param1.js4
-rw-r--r--src/mongo/db/operation_time_tracker.cpp16
-rw-r--r--src/mongo/db/operation_time_tracker.h3
-rw-r--r--src/mongo/db/s/SConscript12
-rw-r--r--src/mongo/db/s/sharding_task_executor.cpp152
-rw-r--r--src/mongo/db/s/sharding_task_executor.h79
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/commands/strategy.cpp8
-rw-r--r--src/mongo/s/sharding_initialization.cpp18
-rw-r--r--src/mongo/shell/assert.js2
13 files changed, 283 insertions, 23 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml
index fc09e03a452..5010c83f19d 100644
--- a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml
@@ -66,10 +66,6 @@ selector:
- jstests/core/ns_length.js
- jstests/core/views/*.js # Views tests aren't expected to work when collections are implicitly sharded.
- jstests/core/killop_drop_collection.js # Uses fsyncLock.
- # TODO: SERVER-28107
- - jstests/core/batch_write_command_delete.js
- - jstests/core/batch_write_command_insert.js
- - jstests/core/batch_write_command_update.js
# TODO: Remove after fixing SERVER-28321.
- jstests/core/mr_undef.js
# TODO: SERVER-16605
diff --git a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml
index fda211b6fff..9791e68b1ad 100644
--- a/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_jscore_passthrough.yml
@@ -48,10 +48,6 @@ selector:
# TODO: SERVER-27269: mongos can't establish cursor if view has $collStats and views another view.
- jstests/core/views/views_coll_stats.js
- jstests/core/killop_drop_collection.js # Uses fsyncLock.
- # TODO: SERVER-28107
- - jstests/core/batch_write_command_delete.js
- - jstests/core/batch_write_command_insert.js
- - jstests/core/batch_write_command_update.js
executor:
diff --git a/jstests/core/batch_write_command_delete.js b/jstests/core/batch_write_command_delete.js
index 7a85bc6491b..c9813e66282 100644
--- a/jstests/core/batch_write_command_delete.js
+++ b/jstests/core/batch_write_command_delete.js
@@ -227,7 +227,8 @@ result = coll.runCommand(request);
assert.commandWorked(result);
assert.eq(1, coll.count());
-assert.hasFields(result, fields, 'fields in result do not match: ' + tojson(fields));
+// the error path does not go to the shard and hence there is no operationTime.
+assert.hasFields(result, ["ok"], 'fields in result do not match: ' + tojson(fields));
//
// When limit is not 0 and 1
diff --git a/jstests/core/set_param1.js b/jstests/core/set_param1.js
index 51b13ae87cc..88d4f49b5ad 100644
--- a/jstests/core/set_param1.js
+++ b/jstests/core/set_param1.js
@@ -2,9 +2,13 @@
// and shell helpers.
old = db.adminCommand({"getParameter": "*"});
+// the first time getParameter sends a request to with a shardingTaskExecutor and this sets an
+// operationTime. The following commands do not use shardingTaskExecutor.
+delete old["operationTime"];
tmp1 = db.adminCommand({"setParameter": 1, "logLevel": 5});
tmp2 = db.adminCommand({"setParameter": 1, "logLevel": old.logLevel});
now = db.adminCommand({"getParameter": "*"});
+delete now["operationTime"];
assert.eq(old, now, "A");
assert.eq(old.logLevel, tmp1.was, "B");
diff --git a/src/mongo/db/operation_time_tracker.cpp b/src/mongo/db/operation_time_tracker.cpp
index 57a17f2ccae..71397a3e757 100644
--- a/src/mongo/db/operation_time_tracker.cpp
+++ b/src/mongo/db/operation_time_tracker.cpp
@@ -32,9 +32,21 @@
#include "mongo/stdx/mutex.h"
namespace mongo {
+namespace {
+auto getOperationTimeTracker =
+ OperationContext::declareDecoration<std::shared_ptr<OperationTimeTracker>>();
+}
+
-const OperationContext::Decoration<OperationTimeTracker> OperationTimeTracker::get =
- OperationContext::declareDecoration<OperationTimeTracker>();
+std::shared_ptr<OperationTimeTracker> OperationTimeTracker::get(OperationContext* opCtx) {
+ return getOperationTimeTracker(opCtx);
+}
+
+void OperationTimeTracker::set(OperationContext* opCtx,
+ std::shared_ptr<OperationTimeTracker> trackerArg) {
+ auto& tracker = getOperationTimeTracker(opCtx);
+ tracker = std::move(trackerArg);
+}
LogicalTime OperationTimeTracker::getMaxOperationTime() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
diff --git a/src/mongo/db/operation_time_tracker.h b/src/mongo/db/operation_time_tracker.h
index 812368b4a58..39157cc1a6e 100644
--- a/src/mongo/db/operation_time_tracker.h
+++ b/src/mongo/db/operation_time_tracker.h
@@ -42,7 +42,8 @@ namespace mongo {
class OperationTimeTracker {
public:
// Decorate OperationContext with OperationTimeTracker instance.
- static const OperationContext::Decoration<OperationTimeTracker> get;
+ static std::shared_ptr<OperationTimeTracker> get(OperationContext* ctx);
+ static void set(OperationContext* opCtx, std::shared_ptr<OperationTimeTracker> tracker);
/*
* Return the latest operationTime.
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index e4ec6233845..f2d6de5347a 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -32,6 +32,17 @@ env.Library(
)
env.Library(
+ target='sharding_task_executor',
+ source=[
+ 'sharding_task_executor.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ ],
+)
+
+env.Library(
target='migration_types',
source=[
'migration_session_id.cpp',
@@ -95,6 +106,7 @@ env.Library(
'$BUILD_DIR/mongo/util/elapsed_tracker',
'metadata',
'migration_types',
+ 'sharding_task_executor',
'type_shard_identity',
#'$BUILD_DIR/mongo/db/ops/write_ops', # CYCLE
#'$BUILD_DIR/mongo/s/catalog/sharding_catalog_manager_impl', # CYCLE
diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp
new file mode 100644
index 00000000000..3c02c467e61
--- /dev/null
+++ b/src/mongo/db/s/sharding_task_executor.cpp
@@ -0,0 +1,152 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_task_executor.h"
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/logical_time.h"
+#include "mongo/db/operation_time_tracker.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace executor {
+
+namespace {
+const std::string kOperationTimeField = "operationTime";
+}
+
+ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor)
+ : _executor(std::move(executor)) {}
+
+void ShardingTaskExecutor::startup() {
+ _executor->startup();
+}
+
+void ShardingTaskExecutor::shutdown() {
+ _executor->shutdown();
+}
+
+void ShardingTaskExecutor::join() {
+ _executor->join();
+}
+
+std::string ShardingTaskExecutor::getDiagnosticString() const {
+ return _executor->getDiagnosticString();
+}
+
+Date_t ShardingTaskExecutor::now() {
+ return _executor->now();
+}
+
+StatusWith<TaskExecutor::EventHandle> ShardingTaskExecutor::makeEvent() {
+ return _executor->makeEvent();
+}
+
+void ShardingTaskExecutor::signalEvent(const EventHandle& event) {
+ return _executor->signalEvent(event);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::onEvent(const EventHandle& event,
+ const CallbackFn& work) {
+ return _executor->onEvent(event, work);
+}
+
+void ShardingTaskExecutor::waitForEvent(const EventHandle& event) {
+ _executor->waitForEvent(event);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWork(
+ const CallbackFn& work) {
+ return _executor->scheduleWork(work);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleWorkAt(
+ Date_t when, const CallbackFn& work) {
+ return _executor->scheduleWorkAt(when, work);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCommand(
+ const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+
+ // schedule the user's callback if there is not opCtx
+ if (!request.opCtx) {
+ return _executor->scheduleRemoteCommand(request, cb);
+ }
+
+ std::shared_ptr<OperationTimeTracker> timeTracker = OperationTimeTracker::get(request.opCtx);
+ if (!timeTracker) { // install the time tracker on the opCtx
+ timeTracker = std::make_shared<OperationTimeTracker>();
+ OperationTimeTracker::set(request.opCtx, timeTracker);
+ }
+
+ auto shardingCb = [timeTracker, cb](const TaskExecutor::RemoteCommandCallbackArgs& args) {
+ cb(args);
+
+ invariant(timeTracker);
+
+ if (!args.response.isOK()) {
+ LOG(1) << "Error processing the remote request"
+ << "do not update operationTime";
+ return;
+ }
+
+ auto operationTime = args.response.data[kOperationTimeField];
+ if (operationTime.eoo()) {
+ LOG(1) << "No operationTime in the response";
+ return;
+ }
+
+ invariant(operationTime.type() == BSONType::bsonTimestamp);
+
+ timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp()));
+ };
+
+ return _executor->scheduleRemoteCommand(request, shardingCb);
+}
+
+void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) {
+ _executor->cancel(cbHandle);
+}
+
+void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) {
+ _executor->wait(cbHandle);
+}
+
+void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const {
+ _executor->appendConnectionStats(stats);
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_task_executor.h b/src/mongo/db/s/sharding_task_executor.h
new file mode 100644
index 00000000000..7828b56dd94
--- /dev/null
+++ b/src/mongo/db/s/sharding_task_executor.h
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/list.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+namespace executor {
+
+struct ConnectionPoolStats;
+class ThreadPoolTaskExecutor;
+
+/**
+ * Implementation of a TaskExecutor that uses ThreadPoolTaskExecutor to submit tasks and allows to
+ * override methods if needed.
+ */
+class ShardingTaskExecutor final : public TaskExecutor {
+ MONGO_DISALLOW_COPYING(ShardingTaskExecutor);
+
+public:
+ ShardingTaskExecutor(std::unique_ptr<ThreadPoolTaskExecutor> executor);
+
+ void startup() override;
+ void shutdown() override;
+ void join() override;
+ std::string getDiagnosticString() const override;
+ Date_t now() override;
+ StatusWith<EventHandle> makeEvent() override;
+ void signalEvent(const EventHandle& event) override;
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, const CallbackFn& work) override;
+ void waitForEvent(const EventHandle& event) override;
+ StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) override;
+ void cancel(const CallbackHandle& cbHandle) override;
+ void wait(const CallbackHandle& cbHandle) override;
+
+ void appendConnectionStats(ConnectionPoolStats* stats) const override;
+
+private:
+ std::unique_ptr<ThreadPoolTaskExecutor> _executor;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index eede0298c6d..c3b69f7bd08 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -30,6 +30,7 @@ env.Library(
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl',
'$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager',
+ '$BUILD_DIR/mongo/db/s/sharding_task_executor',
'client/sharding_connection_hook',
'coreshard',
],
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index b0b51806c3e..b586acfc57d 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_time_tracker.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
@@ -82,6 +83,8 @@ using std::stringstream;
namespace {
+const std::string kOperationTime = "operationTime";
+
void runAgainstRegistered(OperationContext* opCtx,
const char* ns,
BSONObj& jsobj,
@@ -167,6 +170,11 @@ Status processCommandMetadata(OperationContext* opCtx, const BSONObj& cmdObj) {
void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* responseBuilder) {
rpc::LogicalTimeMetadata logicalTimeMetadata(LogicalClock::get(opCtx)->getClusterTime());
logicalTimeMetadata.writeToMetadata(responseBuilder);
+ auto tracker = OperationTimeTracker::get(opCtx);
+ if (tracker) {
+ auto operationTime = OperationTimeTracker::get(opCtx)->getMaxOperationTime();
+ responseBuilder->append(kOperationTime, operationTime.asTimestamp());
+ }
}
MONGO_INITIALIZER(InitializeCommandExecCommandHandler)(InitializerContext* const) {
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index d6fe68b2b73..8234e4ea6ed 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -37,6 +37,7 @@
#include "mongo/base/status.h"
#include "mongo/client/remote_command_targeter_factory_impl.h"
#include "mongo/db/audit.h"
+#include "mongo/db/s/sharding_task_executor.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -89,13 +90,15 @@ using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutorPool;
using executor::ThreadPoolTaskExecutor;
+using executor::ShardingTaskExecutor;
static constexpr auto kRetryInterval = Seconds{2};
-std::unique_ptr<ThreadPoolTaskExecutor> makeTaskExecutor(std::unique_ptr<NetworkInterface> net) {
+auto makeTaskExecutor(std::unique_ptr<NetworkInterface> net) {
auto netPtr = net.get();
- return stdx::make_unique<ThreadPoolTaskExecutor>(
+ auto executor = stdx::make_unique<ThreadPoolTaskExecutor>(
stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
+ return stdx::make_unique<ShardingTaskExecutor>(std::move(executor));
}
std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service,
@@ -119,22 +122,17 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
std::vector<std::unique_ptr<executor::TaskExecutor>> executors;
for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) {
- auto net = executor::makeNetworkInterface(
+ auto exec = makeTaskExecutor(executor::makeNetworkInterface(
"NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i),
stdx::make_unique<ShardingNetworkConnectionHook>(),
metadataHookBuilder(),
- connPoolOptions);
- auto netPtr = net.get();
- auto exec = stdx::make_unique<ThreadPoolTaskExecutor>(
- stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
+ connPoolOptions));
executors.emplace_back(std::move(exec));
}
// Add executor used to perform non-performance critical work.
- auto fixedNetPtr = fixedNet.get();
- auto fixedExec = stdx::make_unique<ThreadPoolTaskExecutor>(
- stdx::make_unique<NetworkInterfaceThreadPool>(fixedNetPtr), std::move(fixedNet));
+ auto fixedExec = makeTaskExecutor(std::move(fixedNet));
auto executorPool = stdx::make_unique<TaskExecutorPool>();
executorPool->addExecutors(std::move(executors), std::move(fixedExec));
diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js
index 5c8dafff4a5..b26380fdb18 100644
--- a/src/mongo/shell/assert.js
+++ b/src/mongo/shell/assert.js
@@ -144,7 +144,7 @@ assert.hasFields = function(result, arr, msg) {
}
if (count != arr.length) {
- doassert("None of values from " + tojson(arr) + " was in " + tojson(o) + " : " + msg);
+ doassert("None of values from " + tojson(arr) + " was in " + tojson(result) + " : " + msg);
}
};