diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/SConscript | 23 | ||||
-rw-r--r-- | src/mongo/client/fetcher.cpp (renamed from src/mongo/db/repl/fetcher.cpp) | 4 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h (renamed from src/mongo/db/repl/fetcher.h) | 3 | ||||
-rw-r--r-- | src/mongo/client/fetcher_test.cpp (renamed from src/mongo/db/repl/fetcher_test.cpp) | 2 | ||||
-rw-r--r-- | src/mongo/client/query_fetcher.cpp | 76 | ||||
-rw-r--r-- | src/mongo/client/query_fetcher.h | 85 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.h | 2 |
12 files changed, 196 insertions, 114 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 8b6aefc8d34..304d83cd04d 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -217,3 +217,26 @@ env.Library( LIBDEPS=[ ], ) + +env.Library( + target='fetcher', + source=[ + 'fetcher.cpp', + 'query_fetcher.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/executor/task_executor_interface', + '$BUILD_DIR/mongo/logger/logger', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/rpc/command_status', + ], +) + +env.CppUnitTest( + target='fetcher_test', + source='fetcher_test.cpp', + LIBDEPS=[ + 'fetcher', + '$BUILD_DIR/mongo/db/repl/replication_executor_test_fixture', + ], +) diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/client/fetcher.cpp index cfe8402057d..2760b092bd4 100644 --- a/src/mongo/db/repl/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/fetcher.h" +#include "mongo/client/fetcher.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" @@ -37,7 +37,6 @@ #include "mongo/util/mongoutils/str.h" namespace mongo { -namespace repl { namespace { @@ -297,5 +296,4 @@ namespace { _condition.notify_all(); } -} // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/client/fetcher.h index 15ea466981a..32b1c5aa8cb 100644 --- a/src/mongo/db/repl/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -44,8 +44,6 @@ #include "mongo/util/net/hostandport.h" namespace mongo { -namespace repl { - class Fetcher { MONGO_DISALLOW_COPYING(Fetcher); public: @@ -185,5 +183,4 @@ namespace repl { executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle; }; -} // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index 4672dd9759d..6414dbae1f9 100644 --- a/src/mongo/db/repl/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -30,8 +30,8 @@ #include <memory> +#include "mongo/client/fetcher.h" #include "mongo/db/jsobj.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/executor/network_interface_mock.h" diff --git a/src/mongo/client/query_fetcher.cpp b/src/mongo/client/query_fetcher.cpp new file mode 100644 index 00000000000..eee82f147cb --- /dev/null +++ b/src/mongo/client/query_fetcher.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/query_fetcher.h" + +namespace mongo { + + QueryFetcher::QueryFetcher(executor::TaskExecutor* exec, + const HostAndPort& src, + const NamespaceString& nss, + const BSONObj& cmdBSON, + const CallbackFn& work) + : _exec(exec), + _fetcher(exec, + src, + nss.db().toString(), + cmdBSON, + stdx::bind(&QueryFetcher::_onFetchCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3)), + _responses(0), + _work(work) { + + } + + void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + ++_responses; + + _delegateCallback(fetchResult, nextAction); + // The fetcher will continue to call with kGetMore until an error or the last batch. + if (fetchResult.isOK() && *nextAction == Fetcher::NextAction::kGetMore) { + const auto batchData(fetchResult.getValue()); + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + } + } + + std::string QueryFetcher::toString() const { + return str::stream() << "QueryFetcher -" + << " responses: " << _responses + << " fetcher: " << _fetcher.getDiagnosticString(); + } + +} // namespace mongo diff --git a/src/mongo/client/query_fetcher.h b/src/mongo/client/query_fetcher.h new file mode 100644 index 00000000000..21f7461fb43 --- /dev/null +++ b/src/mongo/client/query_fetcher.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/client/fetcher.h" +#include "mongo/stdx/functional.h" + +namespace mongo { + + struct HostAndPort; + class NamespaceString; + class BSONObj; + class BSONObjBuilder; + + /** + * Follows the fetcher pattern for a find+getmore. + * QueryFetcher will continue to call getmore until an error or + * until the last batch of results. + */ + class QueryFetcher { + MONGO_DISALLOW_COPYING(QueryFetcher); + public: + using BatchDataStatus = StatusWith<Fetcher::BatchData>; + using CallbackFn = stdx::function<void (const BatchDataStatus&, Fetcher::NextAction*)>; + + QueryFetcher(executor::TaskExecutor* exec, + const HostAndPort& source, + const NamespaceString& nss, + const BSONObj& cmdBSON, + const QueryFetcher::CallbackFn& onBatchAvailable); + virtual ~QueryFetcher() = default; + + bool isActive() const { return _fetcher.isActive(); } + Status schedule() { return _fetcher.schedule(); } + void cancel() { return _fetcher.cancel(); } + void wait() { if (_fetcher.isActive()) _fetcher.wait(); } + std::string toString() const; + + protected: + void _onFetchCallback(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob); + + virtual void _delegateCallback(const BatchDataStatus& fetchResult, + Fetcher::NextAction* nextAction) { + _work(fetchResult, nextAction); + }; + + executor::TaskExecutor* _exec; + Fetcher _fetcher; + int _responses; + const QueryFetcher::CallbackFn _work; + }; + +} // namespace mongo diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d899720f731..98fa6f5caad 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -423,19 +423,6 @@ env.Library( ], ) -env.Library( - target='fetcher', - source=[ - 'fetcher.cpp', - ], - LIBDEPS=[ - 'replication_executor', - '$BUILD_DIR/mongo/logger/logger', - '$BUILD_DIR/mongo/db/namespace_string', - '$BUILD_DIR/mongo/rpc/command_status', - ], -) - env.CppUnitTest( target='reporter_test', source='reporter_test.cpp', @@ -445,15 +432,6 @@ env.CppUnitTest( ], ) -env.CppUnitTest( - target='fetcher_test', - source='fetcher_test.cpp', - LIBDEPS=[ - 'fetcher', - 'replication_executor_test_fixture', - ], -) - env.Library( target='base_cloner_test_fixture', source=[ @@ -470,9 +448,9 @@ env.Library( 'collection_cloner.cpp', ], LIBDEPS=[ - 'fetcher', 'replication_executor', '$BUILD_DIR/mongo/db/catalog/collection_options', + '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/logger/logger', ], ) @@ -611,8 +589,8 @@ env.Library( 'applier', 'collection_cloner', 'database_cloner', - 'fetcher', 'repl_coordinator_interface', + '$BUILD_DIR/mongo/client/fetcher', ], ) diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 37dc718195b..19ff6850163 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -35,10 +35,10 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" +#include "mongo/client/fetcher.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/base_cloner.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/condition_variable.h" diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 8e71942ec54..d0caa1eebd6 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -37,7 +37,9 @@ #include <thread> #include "mongo/base/status.h" +#include "mongo/client/query_fetcher.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" @@ -92,43 +94,6 @@ namespace { } // namespace /** - * Follows the fetcher pattern for a find+getmore - */ - class QueryFetcher { - MONGO_DISALLOW_COPYING(QueryFetcher); - public: - using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>; - - QueryFetcher(ReplicationExecutor* exec, - const HostAndPort& source, - const NamespaceString& nss, - const BSONObj& cmdBSON, - const QueryFetcher::CallbackFn& onBatchAvailable); - virtual ~QueryFetcher() = default; - - bool isActive() const { return _fetcher.isActive(); } - Status schedule() { return _fetcher.schedule(); } - void cancel() { return _fetcher.cancel(); } - void wait() { if (_fetcher.isActive()) _fetcher.wait(); } - std::string toString() const; - - protected: - void _onFetchCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob); - - virtual void _delegateCallback(const BatchDataStatus& fetchResult, - NextAction* nextAction) { - _work(fetchResult, nextAction); - }; - - ReplicationExecutor* _exec; - Fetcher _fetcher; - int _responses; - const QueryFetcher::CallbackFn _work; - }; - - /** * Follows the fetcher pattern for a find+getmore on an oplog * Returns additional errors if the start oplog entry cannot be found. */ @@ -156,47 +121,6 @@ namespace { const Timestamp _startTS; }; - // QueryFetcher - QueryFetcher::QueryFetcher(ReplicationExecutor* exec, - const HostAndPort& src, - const NamespaceString& nss, - const BSONObj& cmdBSON, - const CallbackFn& work) - : _exec(exec), - _fetcher(exec, - src, - nss.db().toString(), - cmdBSON, - stdx::bind(&QueryFetcher::_onFetchCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)), - _responses(0), - _work(work) { - } - - void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - ++_responses; - - _delegateCallback(fetchResult, nextAction); - // The fetcher will continue to call with kGetMore until an error or the last batch. - if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) { - const auto batchData(fetchResult.getValue()); - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - } - } - - std::string QueryFetcher::toString() const { - return str::stream() << "QueryFetcher -" - << " responses: " << _responses - << " fetcher: " << _fetcher.getDiagnosticString(); - } - // OplogFetcher OplogFetcher::OplogFetcher(ReplicationExecutor* exec, const Timestamp& startTS, diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 59e242dfc0d..a01e0c61b19 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -41,7 +41,6 @@ #include "mongo/db/repl/applier.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/reporter.h" @@ -49,6 +48,9 @@ #include "mongo/util/queue.h" namespace mongo { + +class QueryFetcher; + namespace repl { using Operations = Applier::Operations; @@ -65,7 +67,6 @@ using Response = RemoteCommandResponse; using TimestampStatus = StatusWith<Timestamp>; using UniqueLock = stdx::unique_lock<stdx::mutex>; -class QueryFetcher; class OplogFetcher; struct InitialSyncState; diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 854829ef113..f10fb2514fa 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -32,10 +32,10 @@ #include <memory> +#include "mongo/client/fetcher.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replica_set_config.h" diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 447d7ce07fd..071b33d0e96 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -35,10 +35,10 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" +#include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/base_cloner.h" -#include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" |