From 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 Mon Sep 17 00:00:00 2001 From: Mark Benvenuto Date: Sat, 20 Jun 2015 00:22:50 -0400 Subject: SERVER-18579: Clang-Format - reformat code, no comment reflow --- src/mongo/s/strategy.cpp | 1015 ++++++++++++++++++++++------------------------ 1 file changed, 496 insertions(+), 519 deletions(-) (limited to 'src/mongo/s/strategy.cpp') diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 061059c709d..f2d44b594c0 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -69,643 +69,620 @@ namespace mongo { - using std::unique_ptr; - using std::shared_ptr; - using std::endl; - using std::set; - using std::string; - using std::stringstream; - using std::vector; - - static bool _isSystemIndexes( const char* ns ) { - return nsToCollectionSubstring(ns) == "system.indexes"; +using std::unique_ptr; +using std::shared_ptr; +using std::endl; +using std::set; +using std::string; +using std::stringstream; +using std::vector; + +static bool _isSystemIndexes(const char* ns) { + return nsToCollectionSubstring(ns) == "system.indexes"; +} + +/** + * Returns true if request is a query for sharded indexes. + */ +static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) { + // Extract the ns field from the query, which may be embedded within the "query" or + // "$query" field. + auto nsField = qSpec.filter()["ns"]; + if (nsField.eoo()) { + return false; } + const NamespaceString indexNSSQuery(nsField.str()); - /** - * Returns true if request is a query for sharded indexes. - */ - static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) { - // Extract the ns field from the query, which may be embedded within the "query" or - // "$query" field. - auto nsField = qSpec.filter()["ns"]; - if (nsField.eoo()) { - return false; - } - const NamespaceString indexNSSQuery(nsField.str()); + auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString()); + if (!status.isOK()) { + return false; + } - auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString()); - if (!status.isOK()) { - return false; - } + shared_ptr config = status.getValue(); + if (!config->isSharded(indexNSSQuery.ns())) { + return false; + } - shared_ptr config = status.getValue(); - if (!config->isSharded(indexNSSQuery.ns())) { - return false; - } + // if you are querying on system.indexes, we need to make sure we go to a shard + // that actually has chunks. This is not a perfect solution (what if you just + // look at all indexes), but better than doing nothing. - // if you are querying on system.indexes, we need to make sure we go to a shard - // that actually has chunks. This is not a perfect solution (what if you just - // look at all indexes), but better than doing nothing. - - ShardPtr shard; - ChunkManagerPtr cm; - config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard); - if ( cm ) { - set shardIds; - cm->getAllShardIds(&shardIds); - verify(shardIds.size() > 0); - shard = grid.shardRegistry()->getShard(*shardIds.begin()); - } + ShardPtr shard; + ChunkManagerPtr cm; + config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard); + if (cm) { + set shardIds; + cm->getAllShardIds(&shardIds); + verify(shardIds.size() > 0); + shard = grid.shardRegistry()->getShard(*shardIds.begin()); + } - ShardConnection dbcon(shard->getConnString(), r.getns()); - DBClientBase &c = dbcon.conn(); + ShardConnection dbcon(shard->getConnString(), r.getns()); + DBClientBase& c = dbcon.conn(); - string actualServer; + string actualServer; - Message response; - bool ok = c.call( r.m(), response, true , &actualServer ); - uassert( 10200 , "mongos: error calling db", ok ); + Message response; + bool ok = c.call(r.m(), response, true, &actualServer); + uassert(10200, "mongos: error calling db", ok); - { - QueryResult::View qr = response.singleData().view2ptr(); - if ( qr.getResultFlags() & ResultFlag_ShardConfigStale ) { - dbcon.done(); - // Version is zero b/c this is deprecated codepath - throw RecvStaleConfigException( r.getns(), - "Strategy::doQuery", - ChunkVersion( 0, 0, OID() ), - ChunkVersion( 0, 0, OID() )); - } + { + QueryResult::View qr = response.singleData().view2ptr(); + if (qr.getResultFlags() & ResultFlag_ShardConfigStale) { + dbcon.done(); + // Version is zero b/c this is deprecated codepath + throw RecvStaleConfigException(r.getns(), + "Strategy::doQuery", + ChunkVersion(0, 0, OID()), + ChunkVersion(0, 0, OID())); } - - r.reply( response , actualServer.size() ? actualServer : c.getServerAddress() ); - dbcon.done(); - - return true; } - void Strategy::queryOp( Request& r ) { + r.reply(response, actualServer.size() ? actualServer : c.getServerAddress()); + dbcon.done(); - verify( !NamespaceString( r.getns() ).isCommand() ); + return true; +} - Timer queryTimer; +void Strategy::queryOp(Request& r) { + verify(!NamespaceString(r.getns()).isCommand()); - QueryMessage q( r.d() ); + Timer queryTimer; - NamespaceString ns(q.ns); - ClientBasic* client = ClientBasic::getCurrent(); - AuthorizationSession* authSession = AuthorizationSession::get(client); - Status status = authSession->checkAuthForQuery(ns, q.query); - audit::logQueryAuthzCheck(client, ns, q.query, status.code()); - uassertStatusOK(status); + QueryMessage q(r.d()); - LOG(3) << "query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn - << " options: " << q.queryOptions << endl; + NamespaceString ns(q.ns); + ClientBasic* client = ClientBasic::getCurrent(); + AuthorizationSession* authSession = AuthorizationSession::get(client); + Status status = authSession->checkAuthForQuery(ns, q.query); + audit::logQueryAuthzCheck(client, ns, q.query, status.code()); + uassertStatusOK(status); - if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") ) - throw UserException( 8010 , "something is wrong, shouldn't see a command here" ); + LOG(3) << "query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn + << " options: " << q.queryOptions << endl; - if (q.queryOptions & QueryOption_Exhaust) { - uasserted(18526, - string("the 'exhaust' query option is invalid for mongos queries: ") + q.ns - + " " + q.query.toString()); - } + if (q.ntoreturn == 1 && strstr(q.ns, ".$cmd")) + throw UserException(8010, "something is wrong, shouldn't see a command here"); - QuerySpec qSpec( (string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions ); + if (q.queryOptions & QueryOption_Exhaust) { + uasserted(18526, + string("the 'exhaust' query option is invalid for mongos queries: ") + q.ns + + " " + q.query.toString()); + } - // Parse "$maxTimeMS". - StatusWith maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery( q.query ); - uassert( 17233, - maxTimeMS.getStatus().reason(), - maxTimeMS.isOK() ); + QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions); - if ( _isSystemIndexes( q.ns ) && doShardedIndexQuery( r, qSpec )) { - return; - } + // Parse "$maxTimeMS". + StatusWith maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query); + uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK()); - ParallelSortClusteredCursor * cursor = new ParallelSortClusteredCursor( qSpec, CommandInfo() ); - verify( cursor ); + if (_isSystemIndexes(q.ns) && doShardedIndexQuery(r, qSpec)) { + return; + } - // TODO: Move out to Request itself, not strategy based - try { - cursor->init(); - - if ( qSpec.isExplain() ) { - BSONObjBuilder explain_builder; - cursor->explain( explain_builder ); - explain_builder.appendNumber( "executionTimeMillis", - static_cast(queryTimer.millis()) ); - BSONObj b = explain_builder.obj(); - - replyToQuery( 0 , r.p() , r.m() , b ); - delete( cursor ); - return; - } - } - catch(...) { - delete cursor; - throw; - } + ParallelSortClusteredCursor* cursor = new ParallelSortClusteredCursor(qSpec, CommandInfo()); + verify(cursor); - // TODO: Revisit all of this when we revisit the sharded cursor cache + // TODO: Move out to Request itself, not strategy based + try { + cursor->init(); - if (cursor->getNumQueryShards() != 1) { + if (qSpec.isExplain()) { + BSONObjBuilder explain_builder; + cursor->explain(explain_builder); + explain_builder.appendNumber("executionTimeMillis", + static_cast(queryTimer.millis())); + BSONObj b = explain_builder.obj(); - // More than one shard (or zero), manage with a ShardedClientCursor - // NOTE: We may also have *zero* shards here when the returnPartial flag is set. - // Currently the code in ShardedClientCursor handles this. + replyToQuery(0, r.p(), r.m(), b); + delete (cursor); + return; + } + } catch (...) { + delete cursor; + throw; + } - ShardedClientCursorPtr cc (new ShardedClientCursor( q , cursor )); + // TODO: Revisit all of this when we revisit the sharded cursor cache - BufBuilder buffer( ShardedClientCursor::INIT_REPLY_BUFFER_SIZE ); - int docCount = 0; - const int startFrom = cc->getTotalSent(); - bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount); + if (cursor->getNumQueryShards() != 1) { + // More than one shard (or zero), manage with a ShardedClientCursor + // NOTE: We may also have *zero* shards here when the returnPartial flag is set. + // Currently the code in ShardedClientCursor handles this. - if ( hasMore ) { - LOG(5) << "storing cursor : " << cc->getId() << endl; + ShardedClientCursorPtr cc(new ShardedClientCursor(q, cursor)); - int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis(); - if ( maxTimeMS.getValue() == 0 ) { // 0 represents "no limit". - cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit; - } - else if ( cursorLeftoverMillis <= 0 ) { - cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; - } + BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); + int docCount = 0; + const int startFrom = cc->getTotalSent(); + bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount); + + if (hasMore) { + LOG(5) << "storing cursor : " << cc->getId() << endl; - cursorCache.store( cc, cursorLeftoverMillis ); + int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis(); + if (maxTimeMS.getValue() == 0) { // 0 represents "no limit". + cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit; + } else if (cursorLeftoverMillis <= 0) { + cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; } - replyToQuery( 0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount, - startFrom, hasMore ? cc->getId() : 0 ); + cursorCache.store(cc, cursorLeftoverMillis); } - else{ - // Only one shard is used + replyToQuery(0, + r.p(), + r.m(), + buffer.buf(), + buffer.len(), + docCount, + startFrom, + hasMore ? cc->getId() : 0); + } else { + // Only one shard is used - // Remote cursors are stored remotely, we shouldn't need this around. - unique_ptr cursorDeleter( cursor ); + // Remote cursors are stored remotely, we shouldn't need this around. + unique_ptr cursorDeleter(cursor); - ShardPtr shard = cursor->getQueryShard(); - verify( shard.get() ); - DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId()); + ShardPtr shard = cursor->getQueryShard(); + verify(shard.get()); + DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId()); - // Implicitly stores the cursor in the cache - r.reply( *(shardCursor->getMessage()) , shardCursor->originalHost() ); + // Implicitly stores the cursor in the cache + r.reply(*(shardCursor->getMessage()), shardCursor->originalHost()); - // We don't want to kill the cursor remotely if there's still data left - shardCursor->decouple(); - } + // We don't want to kill the cursor remotely if there's still data left + shardCursor->decouple(); } +} - void Strategy::clientCommandOp( Request& r ) { - QueryMessage q( r.d() ); +void Strategy::clientCommandOp(Request& r) { + QueryMessage q(r.d()); - LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn - << " options: " << q.queryOptions << endl; + LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn + << " options: " << q.queryOptions << endl; - if (q.queryOptions & QueryOption_Exhaust) { - uasserted(18527, - string("the 'exhaust' query option is invalid for mongos commands: ") + q.ns - + " " + q.query.toString()); - } + if (q.queryOptions & QueryOption_Exhaust) { + uasserted(18527, + string("the 'exhaust' query option is invalid for mongos commands: ") + q.ns + + " " + q.query.toString()); + } - NamespaceString nss( r.getns() ); - // Regular queries are handled in strategy_shard.cpp - verify( nss.isCommand() || nss.isSpecialCommand() ); + NamespaceString nss(r.getns()); + // Regular queries are handled in strategy_shard.cpp + verify(nss.isCommand() || nss.isSpecialCommand()); - if ( handleSpecialNamespaces( r , q ) ) - return; + if (handleSpecialNamespaces(r, q)) + return; - int loops = 5; - while ( true ) { - BSONObjBuilder builder; - try { - BSONObj cmdObj = q.query; - { - BSONElement e = cmdObj.firstElement(); - if (e.type() == Object && (e.fieldName()[0] == '$' - ? str::equals("query", e.fieldName()+1) - : str::equals("query", e.fieldName()))) { - // Extract the embedded query object. - - if (cmdObj.hasField(Query::ReadPrefField.name())) { - // The command has a read preference setting. We don't want - // to lose this information so we copy this to a new field - // called $queryOptions.$readPreference - BSONObjBuilder finalCmdObjBuilder; - finalCmdObjBuilder.appendElements(e.embeddedObject()); - - BSONObjBuilder queryOptionsBuilder( - finalCmdObjBuilder.subobjStart("$queryOptions")); - queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]); - queryOptionsBuilder.done(); - - cmdObj = finalCmdObjBuilder.obj(); - } - else { - cmdObj = e.embeddedObject(); - } + int loops = 5; + while (true) { + BSONObjBuilder builder; + try { + BSONObj cmdObj = q.query; + { + BSONElement e = cmdObj.firstElement(); + if (e.type() == Object && + (e.fieldName()[0] == '$' ? str::equals("query", e.fieldName() + 1) + : str::equals("query", e.fieldName()))) { + // Extract the embedded query object. + + if (cmdObj.hasField(Query::ReadPrefField.name())) { + // The command has a read preference setting. We don't want + // to lose this information so we copy this to a new field + // called $queryOptions.$readPreference + BSONObjBuilder finalCmdObjBuilder; + finalCmdObjBuilder.appendElements(e.embeddedObject()); + + BSONObjBuilder queryOptionsBuilder( + finalCmdObjBuilder.subobjStart("$queryOptions")); + queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]); + queryOptionsBuilder.done(); + + cmdObj = finalCmdObjBuilder.obj(); + } else { + cmdObj = e.embeddedObject(); } } - - Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions); - BSONObj x = builder.done(); - replyToQuery(0, r.p(), r.m(), x); - return; } - catch ( StaleConfigException& e ) { - if ( loops <= 0 ) - throw e; - - loops--; - log() << "retrying command: " << q.query << endl; - // For legacy reasons, ns may not actually be set in the exception :-( - string staleNS = e.getns(); - if( staleNS.size() == 0 ) staleNS = q.ns; - - ShardConnection::checkMyConnectionVersions( staleNS ); - if( loops < 4 ) versionManager.forceRemoteCheckShardVersionCB( staleNS ); - } - catch ( AssertionException& e ) { - Command::appendCommandStatus(builder, e.toStatus()); - BSONObj x = builder.done(); - replyToQuery(0, r.p(), r.m(), x); - return; - } + Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions); + BSONObj x = builder.done(); + replyToQuery(0, r.p(), r.m(), x); + return; + } catch (StaleConfigException& e) { + if (loops <= 0) + throw e; + + loops--; + log() << "retrying command: " << q.query << endl; + + // For legacy reasons, ns may not actually be set in the exception :-( + string staleNS = e.getns(); + if (staleNS.size() == 0) + staleNS = q.ns; + + ShardConnection::checkMyConnectionVersions(staleNS); + if (loops < 4) + versionManager.forceRemoteCheckShardVersionCB(staleNS); + } catch (AssertionException& e) { + Command::appendCommandStatus(builder, e.toStatus()); + BSONObj x = builder.done(); + replyToQuery(0, r.p(), r.m(), x); + return; } } +} - // TODO: remove after MongoDB 3.2 - bool Strategy::handleSpecialNamespaces( Request& r , QueryMessage& q ) { - const char * ns = strstr( r.getns() , ".$cmd.sys." ); - if ( ! ns ) - return false; - ns += 10; - - BSONObjBuilder reply; - - const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) { - BSONObjBuilder cmdBob; - cmdBob.append(commandName, 1); - cmdBob.appendElements(q.query); // fields are validated by Commands - auto interposedCmd = cmdBob.done(); - NamespaceString nss(r.getns()); - NamespaceString interposedNss(nss.db(), "$cmd"); - Command::runAgainstRegistered(interposedNss.ns().c_str(), - interposedCmd, - reply, - q.queryOptions); - }; - - if ( strcmp( ns , "inprog" ) == 0 ) { - upgradeToRealCommand("currentOp"); - } - else if ( strcmp( ns , "killop" ) == 0 ) { - upgradeToRealCommand("killOp"); - } - else if ( strcmp( ns , "unlock" ) == 0 ) { - reply.append( "err" , "can't do unlock through mongos" ); - } - else { - warning() << "unknown sys command [" << ns << "]" << endl; - return false; - } - - BSONObj x = reply.done(); - replyToQuery(0, r.p(), r.m(), x); - return true; +// TODO: remove after MongoDB 3.2 +bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) { + const char* ns = strstr(r.getns(), ".$cmd.sys."); + if (!ns) + return false; + ns += 10; + + BSONObjBuilder reply; + + const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) { + BSONObjBuilder cmdBob; + cmdBob.append(commandName, 1); + cmdBob.appendElements(q.query); // fields are validated by Commands + auto interposedCmd = cmdBob.done(); + NamespaceString nss(r.getns()); + NamespaceString interposedNss(nss.db(), "$cmd"); + Command::runAgainstRegistered( + interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions); + }; + + if (strcmp(ns, "inprog") == 0) { + upgradeToRealCommand("currentOp"); + } else if (strcmp(ns, "killop") == 0) { + upgradeToRealCommand("killOp"); + } else if (strcmp(ns, "unlock") == 0) { + reply.append("err", "can't do unlock through mongos"); + } else { + warning() << "unknown sys command [" << ns << "]" << endl; + return false; } - void Strategy::commandOp( const string& db, - const BSONObj& command, - int options, - const string& versionedNS, - const BSONObj& targetingQuery, - vector* results ) - { - - QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options); + BSONObj x = reply.done(); + replyToQuery(0, r.p(), r.m(), x); + return true; +} - ParallelSortClusteredCursor cursor( qSpec, CommandInfo( versionedNS, targetingQuery ) ); +void Strategy::commandOp(const string& db, + const BSONObj& command, + int options, + const string& versionedNS, + const BSONObj& targetingQuery, + vector* results) { + QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options); - // Initialize the cursor - cursor.init(); + ParallelSortClusteredCursor cursor(qSpec, CommandInfo(versionedNS, targetingQuery)); - set shardIds; - cursor.getQueryShardIds(shardIds); - - for (const ShardId& shardId : shardIds) { - CommandResult result; - result.shardTargetId = shardId; - - string errMsg; // ignored, should never be invalid b/c an exception thrown earlier - result.target = - ConnectionString::parse( cursor.getShardCursor(shardId)->originalHost(), - errMsg ); - result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned(); - results->push_back( result ); - } + // Initialize the cursor + cursor.init(); - } + set shardIds; + cursor.getQueryShardIds(shardIds); - Status Strategy::commandOpWrite(const std::string& dbName, - const BSONObj& command, - BatchItemRef targetingBatchItem, - std::vector* results) { + for (const ShardId& shardId : shardIds) { + CommandResult result; + result.shardTargetId = shardId; - // Note that this implementation will not handle targeting retries and does not completely - // emulate write behavior + string errMsg; // ignored, should never be invalid b/c an exception thrown earlier + result.target = + ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost(), errMsg); + result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned(); + results->push_back(result); + } +} - ChunkManagerTargeter targeter(NamespaceString( - targetingBatchItem.getRequest()->getTargetingNS())); - Status status = targeter.init(); +Status Strategy::commandOpWrite(const std::string& dbName, + const BSONObj& command, + BatchItemRef targetingBatchItem, + std::vector* results) { + // Note that this implementation will not handle targeting retries and does not completely + // emulate write behavior + + ChunkManagerTargeter targeter( + NamespaceString(targetingBatchItem.getRequest()->getTargetingNS())); + Status status = targeter.init(); + if (!status.isOK()) + return status; + + OwnedPointerVector endpointsOwned; + vector& endpoints = endpointsOwned.mutableVector(); + + if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { + ShardEndpoint* endpoint; + Status status = targeter.targetInsert(targetingBatchItem.getDocument(), &endpoint); + if (!status.isOK()) + return status; + endpoints.push_back(endpoint); + } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { + Status status = targeter.targetUpdate(*targetingBatchItem.getUpdate(), &endpoints); if (!status.isOK()) return status; + } else { + invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete); + Status status = targeter.targetDelete(*targetingBatchItem.getDelete(), &endpoints); + if (!status.isOK()) + return status; + } - OwnedPointerVector endpointsOwned; - vector& endpoints = endpointsOwned.mutableVector(); + DBClientShardResolver resolver; + DBClientMultiCommand dispatcher; - if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { - ShardEndpoint* endpoint; - Status status = targeter.targetInsert(targetingBatchItem.getDocument(), &endpoint); - if (!status.isOK()) - return status; - endpoints.push_back(endpoint); - } - else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { - Status status = targeter.targetUpdate(*targetingBatchItem.getUpdate(), &endpoints); - if (!status.isOK()) - return status; - } - else { - invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete); - Status status = targeter.targetDelete(*targetingBatchItem.getDelete(), &endpoints); - if (!status.isOK()) - return status; - } + // Assemble requests + for (vector::const_iterator it = endpoints.begin(); it != endpoints.end(); + ++it) { + const ShardEndpoint* endpoint = *it; - DBClientShardResolver resolver; - DBClientMultiCommand dispatcher; + ConnectionString host; + Status status = resolver.chooseWriteHost(endpoint->shardName, &host); + if (!status.isOK()) + return status; - // Assemble requests - for (vector::const_iterator it = endpoints.begin(); it != endpoints.end(); - ++it) { + RawBSONSerializable request(command); + dispatcher.addCommand(host, dbName, request); + } - const ShardEndpoint* endpoint = *it; + // Errors reported when recv'ing responses + dispatcher.sendAll(); + Status dispatchStatus = Status::OK(); - ConnectionString host; - Status status = resolver.chooseWriteHost(endpoint->shardName, &host); - if (!status.isOK()) - return status; + // Recv responses + while (dispatcher.numPending() > 0) { + ConnectionString host; + RawBSONSerializable response; - RawBSONSerializable request(command); - dispatcher.addCommand(host, dbName, request); + Status status = dispatcher.recvAny(&host, &response); + if (!status.isOK()) { + // We always need to recv() all the sent operations + dispatchStatus = status; + continue; } - // Errors reported when recv'ing responses - dispatcher.sendAll(); - Status dispatchStatus = Status::OK(); - - // Recv responses - while (dispatcher.numPending() > 0) { - - ConnectionString host; - RawBSONSerializable response; - - Status status = dispatcher.recvAny(&host, &response); - if (!status.isOK()) { - // We always need to recv() all the sent operations - dispatchStatus = status; - continue; - } - - CommandResult result; - result.target = host; - { - const auto shard = grid.shardRegistry()->getShard(host.toString()); - result.shardTargetId = shard->getId(); - } - result.result = response.toBSON(); - - results->push_back(result); + CommandResult result; + result.target = host; + { + const auto shard = grid.shardRegistry()->getShard(host.toString()); + result.shardTargetId = shard->getId(); } + result.result = response.toBSON(); - return dispatchStatus; + results->push_back(result); } - Status Strategy::commandOpUnsharded(const std::string& db, - const BSONObj& command, - int options, - const std::string& versionedNS, - CommandResult* cmdResult) { + return dispatchStatus; +} - // Note that this implementation will not handle targeting retries and fails when the - // sharding metadata is too stale - auto status = grid.catalogCache()->getDatabase(db); - if (!status.isOK()) { - mongoutils::str::stream ss; - ss << "Passthrough command failed: " << command.toString() - << " on ns " << versionedNS << ". Caused by " << causedBy(status.getStatus()); - return Status(ErrorCodes::IllegalOperation, ss); - } +Status Strategy::commandOpUnsharded(const std::string& db, + const BSONObj& command, + int options, + const std::string& versionedNS, + CommandResult* cmdResult) { + // Note that this implementation will not handle targeting retries and fails when the + // sharding metadata is too stale + auto status = grid.catalogCache()->getDatabase(db); + if (!status.isOK()) { + mongoutils::str::stream ss; + ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS + << ". Caused by " << causedBy(status.getStatus()); + return Status(ErrorCodes::IllegalOperation, ss); + } - shared_ptr conf = status.getValue(); - if (conf->isSharded(versionedNS)) { - mongoutils::str::stream ss; - ss << "Passthrough command failed: " << command.toString() - << " on ns " << versionedNS << ". Cannot run on sharded namespace."; - return Status(ErrorCodes::IllegalOperation, ss); - } + shared_ptr conf = status.getValue(); + if (conf->isSharded(versionedNS)) { + mongoutils::str::stream ss; + ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS + << ". Cannot run on sharded namespace."; + return Status(ErrorCodes::IllegalOperation, ss); + } - const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId()); + const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId()); - BSONObj shardResult; - try { - ShardConnection conn(primaryShard->getConnString(), ""); - - // TODO: this can throw a stale config when mongos is not up-to-date -- fix. - if (!conn->runCommand(db, command, shardResult, options)) { - conn.done(); - return Status(ErrorCodes::OperationFailed, - str::stream() << "Passthrough command failed: " << command - << " on ns " << versionedNS - << "; result: " << shardResult); - } + BSONObj shardResult; + try { + ShardConnection conn(primaryShard->getConnString(), ""); + + // TODO: this can throw a stale config when mongos is not up-to-date -- fix. + if (!conn->runCommand(db, command, shardResult, options)) { conn.done(); + return Status(ErrorCodes::OperationFailed, + str::stream() << "Passthrough command failed: " << command << " on ns " + << versionedNS << "; result: " << shardResult); } - catch (const DBException& ex) { - return ex.toStatus(); - } - - // Fill out the command result. - cmdResult->shardTargetId = conf->getPrimaryId(); - cmdResult->result = shardResult; - cmdResult->target = primaryShard->getConnString(); - - return Status::OK(); + conn.done(); + } catch (const DBException& ex) { + return ex.toStatus(); } - void Strategy::getMore( Request& r ) { - Timer getMoreTimer; - - const char* ns = r.getns(); - const int ntoreturn = r.d().pullInt(); - const long long id = r.d().pullInt64(); + // Fill out the command result. + cmdResult->shardTargetId = conf->getPrimaryId(); + cmdResult->result = shardResult; + cmdResult->target = primaryShard->getConnString(); - // TODO: Handle stale config exceptions here from coll being dropped or sharded during op - // for now has same semantics as legacy request - const NamespaceString nss(ns); - auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString()); - if (statusGetDb == ErrorCodes::DatabaseNotFound) { - cursorCache.remove(id); - replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0); - return; - } + return Status::OK(); +} - uassertStatusOK(statusGetDb); +void Strategy::getMore(Request& r) { + Timer getMoreTimer; + + const char* ns = r.getns(); + const int ntoreturn = r.d().pullInt(); + const long long id = r.d().pullInt64(); + + // TODO: Handle stale config exceptions here from coll being dropped or sharded during op + // for now has same semantics as legacy request + const NamespaceString nss(ns); + auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString()); + if (statusGetDb == ErrorCodes::DatabaseNotFound) { + cursorCache.remove(id); + replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0); + return; + } - shared_ptr config = statusGetDb.getValue(); + uassertStatusOK(statusGetDb); - ShardPtr primary; - ChunkManagerPtr info; - config->getChunkManagerOrPrimary( ns, info, primary ); + shared_ptr config = statusGetDb.getValue(); - // - // TODO: Cleanup cursor cache, consolidate into single codepath - // - const string host = cursorCache.getRef(id); - ShardedClientCursorPtr cursor = cursorCache.get( id ); - int cursorMaxTimeMS = cursorCache.getMaxTimeMS( id ); + ShardPtr primary; + ChunkManagerPtr info; + config->getChunkManagerOrPrimary(ns, info, primary); - // Cursor ids should not overlap between sharded and unsharded cursors - massert( 17012, str::stream() << "duplicate sharded and unsharded cursor id " - << id << " detected for " << ns - << ", duplicated on host " << host, - NULL == cursorCache.get( id ).get() || host.empty() ); + // + // TODO: Cleanup cursor cache, consolidate into single codepath + // + const string host = cursorCache.getRef(id); + ShardedClientCursorPtr cursor = cursorCache.get(id); + int cursorMaxTimeMS = cursorCache.getMaxTimeMS(id); - ClientBasic* client = ClientBasic::getCurrent(); - NamespaceString nsString(ns); - AuthorizationSession* authSession = AuthorizationSession::get(client); - Status status = authSession->checkAuthForGetMore( nsString, id ); - audit::logGetMoreAuthzCheck( client, nsString, id, status.code() ); - uassertStatusOK(status); + // Cursor ids should not overlap between sharded and unsharded cursors + massert(17012, + str::stream() << "duplicate sharded and unsharded cursor id " << id << " detected for " + << ns << ", duplicated on host " << host, + NULL == cursorCache.get(id).get() || host.empty()); - if( !host.empty() ){ + ClientBasic* client = ClientBasic::getCurrent(); + NamespaceString nsString(ns); + AuthorizationSession* authSession = AuthorizationSession::get(client); + Status status = authSession->checkAuthForGetMore(nsString, id); + audit::logGetMoreAuthzCheck(client, nsString, id, status.code()); + uassertStatusOK(status); - LOG(3) << "single getmore: " << ns << endl; + if (!host.empty()) { + LOG(3) << "single getmore: " << ns << endl; - // we used ScopedDbConnection because we don't get about config versions - // not deleting data is handled elsewhere - // and we don't want to call setShardVersion - ScopedDbConnection conn(host); + // we used ScopedDbConnection because we don't get about config versions + // not deleting data is handled elsewhere + // and we don't want to call setShardVersion + ScopedDbConnection conn(host); - Message response; - bool ok = conn->callRead( r.m() , response); - uassert( 10204 , "dbgrid: getmore: error calling db", ok); + Message response; + bool ok = conn->callRead(r.m(), response); + uassert(10204, "dbgrid: getmore: error calling db", ok); - bool hasMore = (response.singleData().getCursor() != 0); + bool hasMore = (response.singleData().getCursor() != 0); - if ( !hasMore ) { - cursorCache.removeRef( id ); - } + if (!hasMore) { + cursorCache.removeRef(id); + } - r.reply( response , "" /*conn->getServerAddress() */ ); - conn.done(); - return; + r.reply(response, "" /*conn->getServerAddress() */); + conn.done(); + return; + } else if (cursor) { + if (cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired) { + cursorCache.remove(id); + uasserted(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); } - else if ( cursor ) { - if ( cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired ) { - cursorCache.remove( id ); - uasserted( ErrorCodes::ExceededTimeLimit, "operation exceeded time limit" ); - } + // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data? + BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); + int docCount = 0; + const int startFrom = cursor->getTotalSent(); + bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount); - // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data? - BufBuilder buffer( ShardedClientCursor::INIT_REPLY_BUFFER_SIZE ); - int docCount = 0; - const int startFrom = cursor->getTotalSent(); - bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount); - - if ( hasMore ) { - // still more data - cursor->accessed(); - - if ( cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit ) { - // Update remaining amount of time in cursor cache. - int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis(); - if ( cursorLeftoverMillis <= 0 ) { - cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; - } - cursorCache.updateMaxTimeMS( id, cursorLeftoverMillis ); + if (hasMore) { + // still more data + cursor->accessed(); + + if (cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit) { + // Update remaining amount of time in cursor cache. + int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis(); + if (cursorLeftoverMillis <= 0) { + cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; } + cursorCache.updateMaxTimeMS(id, cursorLeftoverMillis); } - else { - // we've exhausted the cursor - cursorCache.remove( id ); - } - - replyToQuery( 0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount, - startFrom, hasMore ? cursor->getId() : 0 ); - return; + } else { + // we've exhausted the cursor + cursorCache.remove(id); } - else { - - LOG( 3 ) << "could not find cursor " << id << " in cache for " << ns << endl; - replyToQuery( ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 ); - return; - } + replyToQuery(0, + r.p(), + r.m(), + buffer.buf(), + buffer.len(), + docCount, + startFrom, + hasMore ? cursor->getId() : 0); + return; + } else { + LOG(3) << "could not find cursor " << id << " in cache for " << ns << endl; + + replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0); + return; } +} - void Strategy::writeOp( int op , Request& r ) { - - // make sure we have a last error - dassert(&LastError::get(cc())); - - OwnedPointerVector requestsOwned; - vector& requests = requestsOwned.mutableVector(); +void Strategy::writeOp(int op, Request& r) { + // make sure we have a last error + dassert(&LastError::get(cc())); - msgToBatchRequests( r.m(), &requests ); + OwnedPointerVector requestsOwned; + vector& requests = requestsOwned.mutableVector(); - for ( vector::iterator it = requests.begin(); - it != requests.end(); ++it ) { + msgToBatchRequests(r.m(), &requests); - // Multiple commands registered to last error as multiple requests - if ( it != requests.begin() ) - LastError::get(cc()).startRequest(); + for (vector::iterator it = requests.begin(); it != requests.end(); + ++it) { + // Multiple commands registered to last error as multiple requests + if (it != requests.begin()) + LastError::get(cc()).startRequest(); - BatchedCommandRequest* request = *it; + BatchedCommandRequest* request = *it; - // Adjust namespaces for command - NamespaceString fullNS( request->getNS() ); - string cmdNS = fullNS.getCommandNS(); - // We only pass in collection name to command - request->setNS( fullNS.coll() ); + // Adjust namespaces for command + NamespaceString fullNS(request->getNS()); + string cmdNS = fullNS.getCommandNS(); + // We only pass in collection name to command + request->setNS(fullNS.coll()); - BSONObjBuilder builder; - BSONObj requestBSON = request->toBSON(); + BSONObjBuilder builder; + BSONObj requestBSON = request->toBSON(); - { - // Disable the last error object for the duration of the write cmd - LastError::Disabled disableLastError(&LastError::get(cc())); - Command::runAgainstRegistered( cmdNS.c_str(), requestBSON, builder, 0 ); - } + { + // Disable the last error object for the duration of the write cmd + LastError::Disabled disableLastError(&LastError::get(cc())); + Command::runAgainstRegistered(cmdNS.c_str(), requestBSON, builder, 0); + } - BatchedCommandResponse response; - bool parsed = response.parseBSON( builder.done(), NULL ); - (void) parsed; // for compile - dassert( parsed && response.isValid( NULL ) ); + BatchedCommandResponse response; + bool parsed = response.parseBSON(builder.done(), NULL); + (void)parsed; // for compile + dassert(parsed && response.isValid(NULL)); - // Populate the lastError object based on the write response - LastError::get(cc()).reset(); - bool hadError = batchErrorToLastError(*request, response, &LastError::get(cc())); + // Populate the lastError object based on the write response + LastError::get(cc()).reset(); + bool hadError = batchErrorToLastError(*request, response, &LastError::get(cc())); - // Check if this is an ordered batch and we had an error which should stop processing - if ( request->getOrdered() && hadError ) - break; - } + // Check if this is an ordered batch and we had an error which should stop processing + if (request->getOrdered() && hadError) + break; } - +} } -- cgit v1.2.1