summaryrefslogtreecommitdiff
path: root/src/mongo/client/parallel.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-07-15 16:42:21 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-07-16 16:18:09 -0400
commitea73cdf50ede2622c9e005eb6e51cdc6ccb60650 (patch)
treec4e55e6da208cb9fc90a648835d93f3603534fad /src/mongo/client/parallel.cpp
parent0254cdbc2d5f04188e45fe98dec1ca985e2ecf0c (diff)
downloadmongo-ea73cdf50ede2622c9e005eb6e51cdc6ccb60650.tar.gz
SERVER-18567 Move Future under the mongos commands
The future class and spawnCommand are only needed for sharding commands, so this change moves them under cluster_commands_common. Also moves setShardVersion to be under the version manager. No functional changes.
Diffstat (limited to 'src/mongo/client/parallel.cpp')
-rw-r--r--src/mongo/client/parallel.cpp136
1 files changed, 2 insertions, 134 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp
index 86e6e40af0d..f3fbe8416e5 100644
--- a/src/mongo/client/parallel.cpp
+++ b/src/mongo/client/parallel.cpp
@@ -87,7 +87,7 @@ string ParallelSortClusteredCursor::getNS() {
* Throws a RecvStaleConfigException wrapping the stale error document in this cursor when the
* ShardConfigStale flag is set or a command returns a SendStaleConfigCode error code.
*/
-static void throwCursorStale(DBClientCursor* cursor) {
+void throwCursorStale(DBClientCursor* cursor) {
verify(cursor);
if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) {
@@ -1534,136 +1534,4 @@ void ParallelSortClusteredCursor::_explain(map<string, list<BSONObj>>& out) {
}
}
-// -----------------
-// ---- Future -----
-// -----------------
-
-Future::CommandResult::CommandResult(const string& server,
- const string& db,
- const BSONObj& cmd,
- int options,
- DBClientBase* conn,
- bool useShardedConn)
- : _server(server),
- _db(db),
- _options(options),
- _cmd(cmd),
- _conn(conn),
- _useShardConn(useShardedConn),
- _done(false) {
- init();
-}
-
-void Future::CommandResult::init() {
- try {
- if (!_conn) {
- if (_useShardConn) {
- _connHolder.reset(new ShardConnection(
- uassertStatusOK(ConnectionString::parse(_server)), "", NULL));
- } else {
- _connHolder.reset(new ScopedDbConnection(_server));
- }
-
- _conn = _connHolder->get();
- }
-
- if (_conn->lazySupported()) {
- _cursor.reset(
- new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1 /*limit*/, 0, NULL, _options, 0));
- _cursor->initLazy();
- } else {
- _done = true; // we set _done first because even if there is an error we're done
- _ok = _conn->runCommand(_db, _cmd, _res, _options);
- }
- } catch (std::exception& e) {
- error() << "Future::spawnCommand (part 1) exception: " << e.what() << endl;
- _ok = false;
- _done = true;
- }
-}
-
-bool Future::CommandResult::join(int maxRetries) {
- if (_done)
- return _ok;
-
-
- _ok = false;
- for (int i = 1; i <= maxRetries; i++) {
- try {
- bool retry = false;
- bool finished = _cursor->initLazyFinish(retry);
-
- // Shouldn't need to communicate with server any more
- if (_connHolder)
- _connHolder->done();
-
- uassert(
- 14812, str::stream() << "Error running command on server: " << _server, finished);
- massert(14813, "Command returned nothing", _cursor->more());
-
- // Rethrow stale config errors stored in this cursor for correct handling
- throwCursorStale(_cursor.get());
-
- _res = _cursor->nextSafe();
- _ok = _res["ok"].trueValue();
-
- break;
- } catch (RecvStaleConfigException& e) {
- verify(versionManager.isVersionableCB(_conn));
-
- // For legacy reasons, we may not always have a namespace :-(
- string staleNS = e.getns();
- if (staleNS.size() == 0)
- staleNS = _db;
-
- if (i >= maxRetries) {
- error() << "Future::spawnCommand (part 2) stale config exception" << causedBy(e)
- << endl;
- throw e;
- }
-
- if (i >= maxRetries / 2) {
- if (!versionManager.forceRemoteCheckShardVersionCB(staleNS)) {
- error() << "Future::spawnCommand (part 2) no config detected" << causedBy(e)
- << endl;
- throw e;
- }
- }
-
- // We may not always have a collection, since we don't know from a generic command what
- // collection is supposed to be acted on, if any
- if (nsGetCollection(staleNS).size() == 0) {
- warning() << "no collection namespace in stale config exception "
- << "for lazy command " << _cmd << ", could not refresh " << staleNS
- << endl;
- } else {
- versionManager.checkShardVersionCB(_conn, staleNS, false, 1);
- }
-
- LOG(i > 1 ? 0 : 1) << "retrying lazy command" << causedBy(e) << endl;
-
- verify(_conn->lazySupported());
- _done = false;
- init();
- continue;
- } catch (std::exception& e) {
- error() << "Future::spawnCommand (part 2) exception: " << causedBy(e) << endl;
- break;
- }
- }
-
- _done = true;
- return _ok;
-}
-
-shared_ptr<Future::CommandResult> Future::spawnCommand(const string& server,
- const string& db,
- const BSONObj& cmd,
- int options,
- DBClientBase* conn,
- bool useShardConn) {
- shared_ptr<Future::CommandResult> res(
- new Future::CommandResult(server, db, cmd, options, conn, useShardConn));
- return res;
-}
-}
+} // namespace mongo