diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-15 16:42:21 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-16 16:18:09 -0400 |
commit | ea73cdf50ede2622c9e005eb6e51cdc6ccb60650 (patch) | |
tree | c4e55e6da208cb9fc90a648835d93f3603534fad /src/mongo | |
parent | 0254cdbc2d5f04188e45fe98dec1ca985e2ecf0c (diff) | |
download | mongo-ea73cdf50ede2622c9e005eb6e51cdc6ccb60650.tar.gz |
SERVER-18567 Move Future under the mongos commands
The future class and spawnCommand are only needed for sharding commands,
so this change moves them under cluster_commands_common.
Also moves setShardVersion to be under the version manager.
No functional changes.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/parallel.cpp | 136 | ||||
-rw-r--r-- | src/mongo/client/parallel.h | 79 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection.cpp | 40 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection.h | 17 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_common.cpp | 135 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_common.h | 81 | ||||
-rw-r--r-- | src/mongo/s/commands/run_on_all_shards_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/version_manager.cpp | 128 |
8 files changed, 312 insertions, 306 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp index 86e6e40af0d..f3fbe8416e5 100644 --- a/src/mongo/client/parallel.cpp +++ b/src/mongo/client/parallel.cpp @@ -87,7 +87,7 @@ string ParallelSortClusteredCursor::getNS() { * Throws a RecvStaleConfigException wrapping the stale error document in this cursor when the * ShardConfigStale flag is set or a command returns a SendStaleConfigCode error code. */ -static void throwCursorStale(DBClientCursor* cursor) { +void throwCursorStale(DBClientCursor* cursor) { verify(cursor); if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) { @@ -1534,136 +1534,4 @@ void ParallelSortClusteredCursor::_explain(map<string, list<BSONObj>>& out) { } } -// ----------------- -// ---- Future ----- -// ----------------- - -Future::CommandResult::CommandResult(const string& server, - const string& db, - const BSONObj& cmd, - int options, - DBClientBase* conn, - bool useShardedConn) - : _server(server), - _db(db), - _options(options), - _cmd(cmd), - _conn(conn), - _useShardConn(useShardedConn), - _done(false) { - init(); -} - -void Future::CommandResult::init() { - try { - if (!_conn) { - if (_useShardConn) { - _connHolder.reset(new ShardConnection( - uassertStatusOK(ConnectionString::parse(_server)), "", NULL)); - } else { - _connHolder.reset(new ScopedDbConnection(_server)); - } - - _conn = _connHolder->get(); - } - - if (_conn->lazySupported()) { - _cursor.reset( - new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1 /*limit*/, 0, NULL, _options, 0)); - _cursor->initLazy(); - } else { - _done = true; // we set _done first because even if there is an error we're done - _ok = _conn->runCommand(_db, _cmd, _res, _options); - } - } catch (std::exception& e) { - error() << "Future::spawnCommand (part 1) exception: " << e.what() << endl; - _ok = false; - _done = true; - } -} - -bool Future::CommandResult::join(int maxRetries) { - if (_done) - return _ok; - - - _ok = false; - for (int i = 1; i <= maxRetries; i++) { - try { - bool retry = false; - bool finished = _cursor->initLazyFinish(retry); - - // Shouldn't need to communicate with server any more - if (_connHolder) - _connHolder->done(); - - uassert( - 14812, str::stream() << "Error running command on server: " << _server, finished); - massert(14813, "Command returned nothing", _cursor->more()); - - // Rethrow stale config errors stored in this cursor for correct handling - throwCursorStale(_cursor.get()); - - _res = _cursor->nextSafe(); - _ok = _res["ok"].trueValue(); - - break; - } catch (RecvStaleConfigException& e) { - verify(versionManager.isVersionableCB(_conn)); - - // For legacy reasons, we may not always have a namespace :-( - string staleNS = e.getns(); - if (staleNS.size() == 0) - staleNS = _db; - - if (i >= maxRetries) { - error() << "Future::spawnCommand (part 2) stale config exception" << causedBy(e) - << endl; - throw e; - } - - if (i >= maxRetries / 2) { - if (!versionManager.forceRemoteCheckShardVersionCB(staleNS)) { - error() << "Future::spawnCommand (part 2) no config detected" << causedBy(e) - << endl; - throw e; - } - } - - // We may not always have a collection, since we don't know from a generic command what - // collection is supposed to be acted on, if any - if (nsGetCollection(staleNS).size() == 0) { - warning() << "no collection namespace in stale config exception " - << "for lazy command " << _cmd << ", could not refresh " << staleNS - << endl; - } else { - versionManager.checkShardVersionCB(_conn, staleNS, false, 1); - } - - LOG(i > 1 ? 0 : 1) << "retrying lazy command" << causedBy(e) << endl; - - verify(_conn->lazySupported()); - _done = false; - init(); - continue; - } catch (std::exception& e) { - error() << "Future::spawnCommand (part 2) exception: " << causedBy(e) << endl; - break; - } - } - - _done = true; - return _ok; -} - -shared_ptr<Future::CommandResult> Future::spawnCommand(const string& server, - const string& db, - const BSONObj& cmd, - int options, - DBClientBase* conn, - bool useShardConn) { - shared_ptr<Future::CommandResult> res( - new Future::CommandResult(server, db, cmd, options, conn, useShardConn)); - return res; -} -} +} // namespace mongo diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h index ae778334fb8..92132e182a1 100644 --- a/src/mongo/client/parallel.h +++ b/src/mongo/client/parallel.h @@ -297,81 +297,6 @@ private: std::unique_ptr<ParallelConnectionMetadata> _pcmData; }; -/** - * Generally clients should be using Strategy::commandOp() wherever possible - the Future API - * does not handle versioning. - * - * tools for doing asynchronous operations - * right now uses underlying sync network ops and uses another thread - * should be changed to use non-blocking io - */ -class Future { -public: - class CommandResult { - public: - std::string getServer() const { - return _server; - } - - bool isDone() const { - return _done; - } - - bool ok() const { - verify(_done); - return _ok; - } - - BSONObj result() const { - verify(_done); - return _res; - } - - /** - blocks until command is done - returns ok() - */ - bool join(int maxRetries = 1); - - private: - CommandResult(const std::string& server, - const std::string& db, - const BSONObj& cmd, - int options, - DBClientBase* conn, - bool useShardedConn); - void init(); - - std::string _server; - std::string _db; - int _options; - BSONObj _cmd; - DBClientBase* _conn; - std::unique_ptr<AScopedConnection> _connHolder; // used if not provided a connection - bool _useShardConn; - - std::unique_ptr<DBClientCursor> _cursor; - - BSONObj _res; - bool _ok; - bool _done; - - friend class Future; - }; +void throwCursorStale(DBClientCursor* cursor); - - /** - * @param server server name - * @param db db name - * @param cmd cmd to exec - * @param conn optional connection to use. will use standard pooled if non-specified - * @param useShardConn use ShardConnection - */ - static std::shared_ptr<CommandResult> spawnCommand(const std::string& server, - const std::string& db, - const BSONObj& cmd, - int options, - DBClientBase* conn = 0, - bool useShardConn = false); -}; -} +} // namespace mongo diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 4607a14baae..81d64dfb3b8 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -508,42 +508,4 @@ void ShardConnection::forgetNS(const string& ns) { ClientConnections::threadInstance()->forgetNS(ns); } - -bool setShardVersion(DBClientBase& conn, - const string& ns, - const string& configServerPrimary, - ChunkVersion version, - ChunkManager* manager, - bool authoritative, - BSONObj& result) { - BSONObjBuilder cmdBuilder; - cmdBuilder.append("setShardVersion", ns); - cmdBuilder.append("configdb", configServerPrimary); - - ShardId shardId; - { - const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress()); - shardId = shard->getId(); - cmdBuilder.append("shard", shardId); - cmdBuilder.append("shardHost", shard->getConnString().toString()); - } - - if (ns.size() > 0) { - version.addToBSON(cmdBuilder); - } else { - cmdBuilder.append("init", true); - } - - if (authoritative) { - cmdBuilder.appendBool("authoritative", 1); - } - - BSONObj cmd = cmdBuilder.obj(); - - LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress() << " " << ns - << " " << cmd - << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : ""); - - return conn.runCommand("admin", cmd, result, 0); -} -} +} // namespace mongo diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h index 42040a3ab79..c6731945ec1 100644 --- a/src/mongo/s/client/shard_connection.h +++ b/src/mongo/s/client/shard_connection.h @@ -31,13 +31,10 @@ #include <string> #include "mongo/client/connpool.h" -#include "mongo/s/chunk_version.h" namespace mongo { class ChunkManager; -typedef std::shared_ptr<ChunkManager> ChunkManagerPtr; - class ShardConnection : public AScopedConnection { public: @@ -140,19 +137,7 @@ private: bool _setVersion; }; - -/** - * Sends the setShardVersion command on the specified connection. - */ -bool setShardVersion(DBClientBase& conn, - const std::string& ns, - const std::string& configServerPrimary, - ChunkVersion version, - ChunkManager* manager, - bool authoritative, - BSONObj& result); - - +typedef std::shared_ptr<ChunkManager> ChunkManagerPtr; typedef std::shared_ptr<ShardConnection> ShardConnectionPtr; extern DBConnectionPool shardConnectionPool; diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp index f390a17d992..93a87c9f497 100644 --- a/src/mongo/s/commands/cluster_commands_common.cpp +++ b/src/mongo/s/commands/cluster_commands_common.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + #include "mongo/platform/basic.h" #include "mongo/s/commands/cluster_commands_common.h" @@ -33,9 +35,142 @@ #include "mongo/db/commands.h" #include "mongo/db/query/cursor_responses.h" #include "mongo/s/cursors.h" +#include "mongo/s/stale_exception.h" +#include "mongo/s/version_manager.h" +#include "mongo/util/log.h" namespace mongo { +using std::shared_ptr; +using std::string; + +Future::CommandResult::CommandResult(const string& server, + const string& db, + const BSONObj& cmd, + int options, + DBClientBase* conn, + bool useShardedConn) + : _server(server), + _db(db), + _options(options), + _cmd(cmd), + _conn(conn), + _useShardConn(useShardedConn), + _done(false) { + init(); +} + +void Future::CommandResult::init() { + try { + if (!_conn) { + if (_useShardConn) { + _connHolder.reset(new ShardConnection( + uassertStatusOK(ConnectionString::parse(_server)), "", NULL)); + } else { + _connHolder.reset(new ScopedDbConnection(_server)); + } + + _conn = _connHolder->get(); + } + + if (_conn->lazySupported()) { + _cursor.reset( + new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1 /*limit*/, 0, NULL, _options, 0)); + _cursor->initLazy(); + } else { + _done = true; // we set _done first because even if there is an error we're done + _ok = _conn->runCommand(_db, _cmd, _res, _options); + } + } catch (std::exception& e) { + error() << "Future::spawnCommand (part 1) exception: " << e.what(); + _ok = false; + _done = true; + } +} + +bool Future::CommandResult::join(int maxRetries) { + if (_done) { + return _ok; + } + + _ok = false; + + for (int i = 1; i <= maxRetries; i++) { + try { + bool retry = false; + bool finished = _cursor->initLazyFinish(retry); + + // Shouldn't need to communicate with server any more + if (_connHolder) + _connHolder->done(); + + uassert( + 14812, str::stream() << "Error running command on server: " << _server, finished); + massert(14813, "Command returned nothing", _cursor->more()); + + // Rethrow stale config errors stored in this cursor for correct handling + throwCursorStale(_cursor.get()); + + _res = _cursor->nextSafe(); + _ok = _res["ok"].trueValue(); + + break; + } catch (const RecvStaleConfigException& e) { + verify(versionManager.isVersionableCB(_conn)); + + // For legacy reasons, we may not always have a namespace :-( + string staleNS = e.getns(); + if (staleNS.size() == 0) + staleNS = _db; + + if (i >= maxRetries) { + error() << "Future::spawnCommand (part 2) stale config exception" << causedBy(e); + throw e; + } + + if (i >= maxRetries / 2) { + if (!versionManager.forceRemoteCheckShardVersionCB(staleNS)) { + error() << "Future::spawnCommand (part 2) no config detected" << causedBy(e); + throw e; + } + } + + // We may not always have a collection, since we don't know from a generic command what + // collection is supposed to be acted on, if any + if (nsGetCollection(staleNS).size() == 0) { + warning() << "no collection namespace in stale config exception " + << "for lazy command " << _cmd << ", could not refresh " << staleNS; + } else { + versionManager.checkShardVersionCB(_conn, staleNS, false, 1); + } + + LOG(i > 1 ? 0 : 1) << "retrying lazy command" << causedBy(e); + + verify(_conn->lazySupported()); + _done = false; + init(); + continue; + } catch (std::exception& e) { + error() << "Future::spawnCommand (part 2) exception: " << causedBy(e); + break; + } + } + + _done = true; + return _ok; +} + +shared_ptr<Future::CommandResult> Future::spawnCommand(const string& server, + const string& db, + const BSONObj& cmd, + int options, + DBClientBase* conn, + bool useShardConn) { + shared_ptr<Future::CommandResult> res( + new Future::CommandResult(server, db, cmd, options, conn, useShardConn)); + return res; +} + int getUniqueCodeFromCommandResults(const std::vector<Strategy::CommandResult>& results) { int commonErrCode = -1; for (std::vector<Strategy::CommandResult>::const_iterator it = results.begin(); diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h index d06e13c63f2..b1b41459f01 100644 --- a/src/mongo/s/commands/cluster_commands_common.h +++ b/src/mongo/s/commands/cluster_commands_common.h @@ -28,15 +28,96 @@ #pragma once +#include <string> #include <vector> #include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" #include "mongo/s/strategy.h" +#include "mongo/stdx/memory.h" namespace mongo { class BSONObj; +class AScopedConnection; +class DBClientBase; +class DBClientCursor; + +/** + * DEPRECATED - do not use in any new code. All new code must use the TaskExecutor interface + * instead. + */ +class Future { +public: + class CommandResult { + public: + std::string getServer() const { + return _server; + } + + bool isDone() const { + return _done; + } + + bool ok() const { + verify(_done); + return _ok; + } + + BSONObj result() const { + verify(_done); + return _res; + } + + /** + blocks until command is done + returns ok() + */ + bool join(int maxRetries = 1); + + private: + CommandResult(const std::string& server, + const std::string& db, + const BSONObj& cmd, + int options, + DBClientBase* conn, + bool useShardedConn); + void init(); + + std::string _server; + std::string _db; + int _options; + BSONObj _cmd; + DBClientBase* _conn; + std::unique_ptr<AScopedConnection> _connHolder; // used if not provided a connection + bool _useShardConn; + + std::unique_ptr<DBClientCursor> _cursor; + + BSONObj _res; + bool _ok; + bool _done; + + friend class Future; + }; + + + /** + * @param server server name + * @param db db name + * @param cmd cmd to exec + * @param conn optional connection to use. will use standard pooled if non-specified + * @param useShardConn use ShardConnection + */ + static std::shared_ptr<CommandResult> spawnCommand(const std::string& server, + const std::string& db, + const BSONObj& cmd, + int options, + DBClientBase* conn = 0, + bool useShardConn = false); +}; + /** * Utility function to compute a single error code from a vector of command results. * diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp index 7c206b6a978..4b8d17b4c8c 100644 --- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp +++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp @@ -36,9 +36,9 @@ #include <set> #include "mongo/db/jsobj.h" -#include "mongo/client/parallel.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index 17c624e541d..d67de5bf69c 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -56,8 +56,7 @@ using std::endl; using std::map; using std::string; -// Global version manager -VersionManager versionManager; +namespace { /** * Tracking information, per-connection, of the latest chunk manager iteration or sequence @@ -65,7 +64,8 @@ VersionManager versionManager; * When the chunk manager is replaced, implying new versions were loaded, the chunk manager * sequence number is iterated by 1 and connections need to re-send shard versions. */ -struct ConnectionShardStatus { +class ConnectionShardStatus { +public: bool hasAnySequenceSet(DBClientBase* conn) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -98,6 +98,7 @@ struct ConnectionShardStatus { _map.erase(conn->getConnectionId()); } +private: // protects _map stdx::mutex _mutex; @@ -107,19 +108,50 @@ struct ConnectionShardStatus { } connectionShardStatus; -void VersionManager::resetShardVersionCB(DBClientBase* conn) { - connectionShardStatus.reset(conn); -} +/** + * Sends the setShardVersion command on the specified connection. + */ +bool setShardVersion(DBClientBase& conn, + const string& ns, + const string& configServerPrimary, + ChunkVersion version, + ChunkManager* manager, + bool authoritative, + BSONObj& result) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("setShardVersion", ns); + cmdBuilder.append("configdb", configServerPrimary); + + ShardId shardId; + { + const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress()); + shardId = shard->getId(); + cmdBuilder.append("shard", shardId); + cmdBuilder.append("shardHost", shard->getConnString().toString()); + } -bool VersionManager::isVersionableCB(DBClientBase* conn) { - // We do not version shard connections when issued from mongod - if (!isMongos()) { - return false; + if (ns.size() > 0) { + version.addToBSON(cmdBuilder); + } else { + cmdBuilder.append("init", true); } - return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; + if (authoritative) { + cmdBuilder.appendBool("authoritative", 1); + } + + BSONObj cmd = cmdBuilder.obj(); + + LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress() << " " << ns + << " " << cmd + << (manager ? string(str::stream() << " " << manager->getSequenceNumber()) : ""); + + return conn.runCommand("admin", cmd, result, 0); } +/** + * Checks whether the specified connection supports versioning. + */ DBClientBase* getVersionable(DBClientBase* conn) { switch (conn->type()) { case ConnectionString::INVALID: @@ -150,32 +182,6 @@ DBClientBase* getVersionable(DBClientBase* conn) { MONGO_UNREACHABLE; } -bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { - const NamespaceString nss(ns); - - // This will force the database catalog entry to be reloaded - grid.catalogCache()->invalidate(nss.db().toString()); - - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); - if (!status.isOK()) { - return false; - } - - shared_ptr<DBConfig> conf = status.getValue(); - - // If we don't have a collection, don't refresh the chunk manager - if (nsGetCollection(ns).size() == 0) { - return false; - } - - ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true); - if (!manager) { - return false; - } - - return true; -} - /** * Special internal logic to run reduced version handshake for empty namespace operations to * shards. @@ -183,7 +189,7 @@ bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { * Eventually this should go completely away, but for now many commands rely on unversioned but * mongos-specific behavior on mongod (auditing and replication information in commands) */ -static bool initShardVersionEmptyNS(DBClientBase* conn_in) { +bool initShardVersionEmptyNS(DBClientBase* conn_in) { bool ok; BSONObj result; DBClientBase* conn = NULL; @@ -419,11 +425,55 @@ bool checkShardVersion(DBClientBase* conn_in, return true; } +} // namespace + +// Global version manager +VersionManager versionManager; + +void VersionManager::resetShardVersionCB(DBClientBase* conn) { + connectionShardStatus.reset(conn); +} + +bool VersionManager::isVersionableCB(DBClientBase* conn) { + // We do not version shard connections when issued from mongod + if (!isMongos()) { + return false; + } + + return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; +} + +bool VersionManager::forceRemoteCheckShardVersionCB(const string& ns) { + const NamespaceString nss(ns); + + // This will force the database catalog entry to be reloaded + grid.catalogCache()->invalidate(nss.db().toString()); + + auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + if (!status.isOK()) { + return false; + } + + shared_ptr<DBConfig> conf = status.getValue(); + + // If we don't have a collection, don't refresh the chunk manager + if (nsGetCollection(ns).size() == 0) { + return false; + } + + ChunkManagerPtr manager = conf->getChunkManagerIfExists(ns, true, true); + if (!manager) { + return false; + } + + return true; +} + bool VersionManager::checkShardVersionCB(DBClientBase* conn_in, const string& ns, bool authoritative, int tryNumber) { - return checkShardVersion(conn_in, ns, ChunkManagerPtr(), authoritative, tryNumber); + return checkShardVersion(conn_in, ns, nullptr, authoritative, tryNumber); } bool VersionManager::checkShardVersionCB(ShardConnection* conn_in, |