summaryrefslogtreecommitdiff
path: root/src/mongo/s/strategy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/strategy.cpp')
-rw-r--r--src/mongo/s/strategy.cpp1015
1 files changed, 496 insertions, 519 deletions
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 061059c709d..f2d44b594c0 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -69,643 +69,620 @@
namespace mongo {
- using std::unique_ptr;
- using std::shared_ptr;
- using std::endl;
- using std::set;
- using std::string;
- using std::stringstream;
- using std::vector;
-
- static bool _isSystemIndexes( const char* ns ) {
- return nsToCollectionSubstring(ns) == "system.indexes";
+using std::unique_ptr;
+using std::shared_ptr;
+using std::endl;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+static bool _isSystemIndexes(const char* ns) {
+ return nsToCollectionSubstring(ns) == "system.indexes";
+}
+
+/**
+ * Returns true if request is a query for sharded indexes.
+ */
+static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) {
+ // Extract the ns field from the query, which may be embedded within the "query" or
+ // "$query" field.
+ auto nsField = qSpec.filter()["ns"];
+ if (nsField.eoo()) {
+ return false;
}
+ const NamespaceString indexNSSQuery(nsField.str());
- /**
- * Returns true if request is a query for sharded indexes.
- */
- static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) {
- // Extract the ns field from the query, which may be embedded within the "query" or
- // "$query" field.
- auto nsField = qSpec.filter()["ns"];
- if (nsField.eoo()) {
- return false;
- }
- const NamespaceString indexNSSQuery(nsField.str());
+ auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString());
+ if (!status.isOK()) {
+ return false;
+ }
- auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString());
- if (!status.isOK()) {
- return false;
- }
+ shared_ptr<DBConfig> config = status.getValue();
+ if (!config->isSharded(indexNSSQuery.ns())) {
+ return false;
+ }
- shared_ptr<DBConfig> config = status.getValue();
- if (!config->isSharded(indexNSSQuery.ns())) {
- return false;
- }
+ // if you are querying on system.indexes, we need to make sure we go to a shard
+ // that actually has chunks. This is not a perfect solution (what if you just
+ // look at all indexes), but better than doing nothing.
- // if you are querying on system.indexes, we need to make sure we go to a shard
- // that actually has chunks. This is not a perfect solution (what if you just
- // look at all indexes), but better than doing nothing.
-
- ShardPtr shard;
- ChunkManagerPtr cm;
- config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard);
- if ( cm ) {
- set<ShardId> shardIds;
- cm->getAllShardIds(&shardIds);
- verify(shardIds.size() > 0);
- shard = grid.shardRegistry()->getShard(*shardIds.begin());
- }
+ ShardPtr shard;
+ ChunkManagerPtr cm;
+ config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard);
+ if (cm) {
+ set<ShardId> shardIds;
+ cm->getAllShardIds(&shardIds);
+ verify(shardIds.size() > 0);
+ shard = grid.shardRegistry()->getShard(*shardIds.begin());
+ }
- ShardConnection dbcon(shard->getConnString(), r.getns());
- DBClientBase &c = dbcon.conn();
+ ShardConnection dbcon(shard->getConnString(), r.getns());
+ DBClientBase& c = dbcon.conn();
- string actualServer;
+ string actualServer;
- Message response;
- bool ok = c.call( r.m(), response, true , &actualServer );
- uassert( 10200 , "mongos: error calling db", ok );
+ Message response;
+ bool ok = c.call(r.m(), response, true, &actualServer);
+ uassert(10200, "mongos: error calling db", ok);
- {
- QueryResult::View qr = response.singleData().view2ptr();
- if ( qr.getResultFlags() & ResultFlag_ShardConfigStale ) {
- dbcon.done();
- // Version is zero b/c this is deprecated codepath
- throw RecvStaleConfigException( r.getns(),
- "Strategy::doQuery",
- ChunkVersion( 0, 0, OID() ),
- ChunkVersion( 0, 0, OID() ));
- }
+ {
+ QueryResult::View qr = response.singleData().view2ptr();
+ if (qr.getResultFlags() & ResultFlag_ShardConfigStale) {
+ dbcon.done();
+ // Version is zero b/c this is deprecated codepath
+ throw RecvStaleConfigException(r.getns(),
+ "Strategy::doQuery",
+ ChunkVersion(0, 0, OID()),
+ ChunkVersion(0, 0, OID()));
}
-
- r.reply( response , actualServer.size() ? actualServer : c.getServerAddress() );
- dbcon.done();
-
- return true;
}
- void Strategy::queryOp( Request& r ) {
+ r.reply(response, actualServer.size() ? actualServer : c.getServerAddress());
+ dbcon.done();
- verify( !NamespaceString( r.getns() ).isCommand() );
+ return true;
+}
- Timer queryTimer;
+void Strategy::queryOp(Request& r) {
+ verify(!NamespaceString(r.getns()).isCommand());
- QueryMessage q( r.d() );
+ Timer queryTimer;
- NamespaceString ns(q.ns);
- ClientBasic* client = ClientBasic::getCurrent();
- AuthorizationSession* authSession = AuthorizationSession::get(client);
- Status status = authSession->checkAuthForQuery(ns, q.query);
- audit::logQueryAuthzCheck(client, ns, q.query, status.code());
- uassertStatusOK(status);
+ QueryMessage q(r.d());
- LOG(3) << "query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
- << " options: " << q.queryOptions << endl;
+ NamespaceString ns(q.ns);
+ ClientBasic* client = ClientBasic::getCurrent();
+ AuthorizationSession* authSession = AuthorizationSession::get(client);
+ Status status = authSession->checkAuthForQuery(ns, q.query);
+ audit::logQueryAuthzCheck(client, ns, q.query, status.code());
+ uassertStatusOK(status);
- if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") )
- throw UserException( 8010 , "something is wrong, shouldn't see a command here" );
+ LOG(3) << "query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
+ << " options: " << q.queryOptions << endl;
- if (q.queryOptions & QueryOption_Exhaust) {
- uasserted(18526,
- string("the 'exhaust' query option is invalid for mongos queries: ") + q.ns
- + " " + q.query.toString());
- }
+ if (q.ntoreturn == 1 && strstr(q.ns, ".$cmd"))
+ throw UserException(8010, "something is wrong, shouldn't see a command here");
- QuerySpec qSpec( (string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions );
+ if (q.queryOptions & QueryOption_Exhaust) {
+ uasserted(18526,
+ string("the 'exhaust' query option is invalid for mongos queries: ") + q.ns +
+ " " + q.query.toString());
+ }
- // Parse "$maxTimeMS".
- StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery( q.query );
- uassert( 17233,
- maxTimeMS.getStatus().reason(),
- maxTimeMS.isOK() );
+ QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions);
- if ( _isSystemIndexes( q.ns ) && doShardedIndexQuery( r, qSpec )) {
- return;
- }
+ // Parse "$maxTimeMS".
+ StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query);
+ uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK());
- ParallelSortClusteredCursor * cursor = new ParallelSortClusteredCursor( qSpec, CommandInfo() );
- verify( cursor );
+ if (_isSystemIndexes(q.ns) && doShardedIndexQuery(r, qSpec)) {
+ return;
+ }
- // TODO: Move out to Request itself, not strategy based
- try {
- cursor->init();
-
- if ( qSpec.isExplain() ) {
- BSONObjBuilder explain_builder;
- cursor->explain( explain_builder );
- explain_builder.appendNumber( "executionTimeMillis",
- static_cast<long long>(queryTimer.millis()) );
- BSONObj b = explain_builder.obj();
-
- replyToQuery( 0 , r.p() , r.m() , b );
- delete( cursor );
- return;
- }
- }
- catch(...) {
- delete cursor;
- throw;
- }
+ ParallelSortClusteredCursor* cursor = new ParallelSortClusteredCursor(qSpec, CommandInfo());
+ verify(cursor);
- // TODO: Revisit all of this when we revisit the sharded cursor cache
+ // TODO: Move out to Request itself, not strategy based
+ try {
+ cursor->init();
- if (cursor->getNumQueryShards() != 1) {
+ if (qSpec.isExplain()) {
+ BSONObjBuilder explain_builder;
+ cursor->explain(explain_builder);
+ explain_builder.appendNumber("executionTimeMillis",
+ static_cast<long long>(queryTimer.millis()));
+ BSONObj b = explain_builder.obj();
- // More than one shard (or zero), manage with a ShardedClientCursor
- // NOTE: We may also have *zero* shards here when the returnPartial flag is set.
- // Currently the code in ShardedClientCursor handles this.
+ replyToQuery(0, r.p(), r.m(), b);
+ delete (cursor);
+ return;
+ }
+ } catch (...) {
+ delete cursor;
+ throw;
+ }
- ShardedClientCursorPtr cc (new ShardedClientCursor( q , cursor ));
+ // TODO: Revisit all of this when we revisit the sharded cursor cache
- BufBuilder buffer( ShardedClientCursor::INIT_REPLY_BUFFER_SIZE );
- int docCount = 0;
- const int startFrom = cc->getTotalSent();
- bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount);
+ if (cursor->getNumQueryShards() != 1) {
+ // More than one shard (or zero), manage with a ShardedClientCursor
+ // NOTE: We may also have *zero* shards here when the returnPartial flag is set.
+ // Currently the code in ShardedClientCursor handles this.
- if ( hasMore ) {
- LOG(5) << "storing cursor : " << cc->getId() << endl;
+ ShardedClientCursorPtr cc(new ShardedClientCursor(q, cursor));
- int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis();
- if ( maxTimeMS.getValue() == 0 ) { // 0 represents "no limit".
- cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit;
- }
- else if ( cursorLeftoverMillis <= 0 ) {
- cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
- }
+ BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
+ int docCount = 0;
+ const int startFrom = cc->getTotalSent();
+ bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount);
+
+ if (hasMore) {
+ LOG(5) << "storing cursor : " << cc->getId() << endl;
- cursorCache.store( cc, cursorLeftoverMillis );
+ int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis();
+ if (maxTimeMS.getValue() == 0) { // 0 represents "no limit".
+ cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit;
+ } else if (cursorLeftoverMillis <= 0) {
+ cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
}
- replyToQuery( 0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount,
- startFrom, hasMore ? cc->getId() : 0 );
+ cursorCache.store(cc, cursorLeftoverMillis);
}
- else{
- // Only one shard is used
+ replyToQuery(0,
+ r.p(),
+ r.m(),
+ buffer.buf(),
+ buffer.len(),
+ docCount,
+ startFrom,
+ hasMore ? cc->getId() : 0);
+ } else {
+ // Only one shard is used
- // Remote cursors are stored remotely, we shouldn't need this around.
- unique_ptr<ParallelSortClusteredCursor> cursorDeleter( cursor );
+ // Remote cursors are stored remotely, we shouldn't need this around.
+ unique_ptr<ParallelSortClusteredCursor> cursorDeleter(cursor);
- ShardPtr shard = cursor->getQueryShard();
- verify( shard.get() );
- DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId());
+ ShardPtr shard = cursor->getQueryShard();
+ verify(shard.get());
+ DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId());
- // Implicitly stores the cursor in the cache
- r.reply( *(shardCursor->getMessage()) , shardCursor->originalHost() );
+ // Implicitly stores the cursor in the cache
+ r.reply(*(shardCursor->getMessage()), shardCursor->originalHost());
- // We don't want to kill the cursor remotely if there's still data left
- shardCursor->decouple();
- }
+ // We don't want to kill the cursor remotely if there's still data left
+ shardCursor->decouple();
}
+}
- void Strategy::clientCommandOp( Request& r ) {
- QueryMessage q( r.d() );
+void Strategy::clientCommandOp(Request& r) {
+ QueryMessage q(r.d());
- LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
- << " options: " << q.queryOptions << endl;
+ LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
+ << " options: " << q.queryOptions << endl;
- if (q.queryOptions & QueryOption_Exhaust) {
- uasserted(18527,
- string("the 'exhaust' query option is invalid for mongos commands: ") + q.ns
- + " " + q.query.toString());
- }
+ if (q.queryOptions & QueryOption_Exhaust) {
+ uasserted(18527,
+ string("the 'exhaust' query option is invalid for mongos commands: ") + q.ns +
+ " " + q.query.toString());
+ }
- NamespaceString nss( r.getns() );
- // Regular queries are handled in strategy_shard.cpp
- verify( nss.isCommand() || nss.isSpecialCommand() );
+ NamespaceString nss(r.getns());
+ // Regular queries are handled in strategy_shard.cpp
+ verify(nss.isCommand() || nss.isSpecialCommand());
- if ( handleSpecialNamespaces( r , q ) )
- return;
+ if (handleSpecialNamespaces(r, q))
+ return;
- int loops = 5;
- while ( true ) {
- BSONObjBuilder builder;
- try {
- BSONObj cmdObj = q.query;
- {
- BSONElement e = cmdObj.firstElement();
- if (e.type() == Object && (e.fieldName()[0] == '$'
- ? str::equals("query", e.fieldName()+1)
- : str::equals("query", e.fieldName()))) {
- // Extract the embedded query object.
-
- if (cmdObj.hasField(Query::ReadPrefField.name())) {
- // The command has a read preference setting. We don't want
- // to lose this information so we copy this to a new field
- // called $queryOptions.$readPreference
- BSONObjBuilder finalCmdObjBuilder;
- finalCmdObjBuilder.appendElements(e.embeddedObject());
-
- BSONObjBuilder queryOptionsBuilder(
- finalCmdObjBuilder.subobjStart("$queryOptions"));
- queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]);
- queryOptionsBuilder.done();
-
- cmdObj = finalCmdObjBuilder.obj();
- }
- else {
- cmdObj = e.embeddedObject();
- }
+ int loops = 5;
+ while (true) {
+ BSONObjBuilder builder;
+ try {
+ BSONObj cmdObj = q.query;
+ {
+ BSONElement e = cmdObj.firstElement();
+ if (e.type() == Object &&
+ (e.fieldName()[0] == '$' ? str::equals("query", e.fieldName() + 1)
+ : str::equals("query", e.fieldName()))) {
+ // Extract the embedded query object.
+
+ if (cmdObj.hasField(Query::ReadPrefField.name())) {
+ // The command has a read preference setting. We don't want
+ // to lose this information so we copy this to a new field
+ // called $queryOptions.$readPreference
+ BSONObjBuilder finalCmdObjBuilder;
+ finalCmdObjBuilder.appendElements(e.embeddedObject());
+
+ BSONObjBuilder queryOptionsBuilder(
+ finalCmdObjBuilder.subobjStart("$queryOptions"));
+ queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]);
+ queryOptionsBuilder.done();
+
+ cmdObj = finalCmdObjBuilder.obj();
+ } else {
+ cmdObj = e.embeddedObject();
}
}
-
- Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions);
- BSONObj x = builder.done();
- replyToQuery(0, r.p(), r.m(), x);
- return;
}
- catch ( StaleConfigException& e ) {
- if ( loops <= 0 )
- throw e;
-
- loops--;
- log() << "retrying command: " << q.query << endl;
- // For legacy reasons, ns may not actually be set in the exception :-(
- string staleNS = e.getns();
- if( staleNS.size() == 0 ) staleNS = q.ns;
-
- ShardConnection::checkMyConnectionVersions( staleNS );
- if( loops < 4 ) versionManager.forceRemoteCheckShardVersionCB( staleNS );
- }
- catch ( AssertionException& e ) {
- Command::appendCommandStatus(builder, e.toStatus());
- BSONObj x = builder.done();
- replyToQuery(0, r.p(), r.m(), x);
- return;
- }
+ Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions);
+ BSONObj x = builder.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return;
+ } catch (StaleConfigException& e) {
+ if (loops <= 0)
+ throw e;
+
+ loops--;
+ log() << "retrying command: " << q.query << endl;
+
+ // For legacy reasons, ns may not actually be set in the exception :-(
+ string staleNS = e.getns();
+ if (staleNS.size() == 0)
+ staleNS = q.ns;
+
+ ShardConnection::checkMyConnectionVersions(staleNS);
+ if (loops < 4)
+ versionManager.forceRemoteCheckShardVersionCB(staleNS);
+ } catch (AssertionException& e) {
+ Command::appendCommandStatus(builder, e.toStatus());
+ BSONObj x = builder.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return;
}
}
+}
- // TODO: remove after MongoDB 3.2
- bool Strategy::handleSpecialNamespaces( Request& r , QueryMessage& q ) {
- const char * ns = strstr( r.getns() , ".$cmd.sys." );
- if ( ! ns )
- return false;
- ns += 10;
-
- BSONObjBuilder reply;
-
- const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) {
- BSONObjBuilder cmdBob;
- cmdBob.append(commandName, 1);
- cmdBob.appendElements(q.query); // fields are validated by Commands
- auto interposedCmd = cmdBob.done();
- NamespaceString nss(r.getns());
- NamespaceString interposedNss(nss.db(), "$cmd");
- Command::runAgainstRegistered(interposedNss.ns().c_str(),
- interposedCmd,
- reply,
- q.queryOptions);
- };
-
- if ( strcmp( ns , "inprog" ) == 0 ) {
- upgradeToRealCommand("currentOp");
- }
- else if ( strcmp( ns , "killop" ) == 0 ) {
- upgradeToRealCommand("killOp");
- }
- else if ( strcmp( ns , "unlock" ) == 0 ) {
- reply.append( "err" , "can't do unlock through mongos" );
- }
- else {
- warning() << "unknown sys command [" << ns << "]" << endl;
- return false;
- }
-
- BSONObj x = reply.done();
- replyToQuery(0, r.p(), r.m(), x);
- return true;
+// TODO: remove after MongoDB 3.2
+bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) {
+ const char* ns = strstr(r.getns(), ".$cmd.sys.");
+ if (!ns)
+ return false;
+ ns += 10;
+
+ BSONObjBuilder reply;
+
+ const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) {
+ BSONObjBuilder cmdBob;
+ cmdBob.append(commandName, 1);
+ cmdBob.appendElements(q.query); // fields are validated by Commands
+ auto interposedCmd = cmdBob.done();
+ NamespaceString nss(r.getns());
+ NamespaceString interposedNss(nss.db(), "$cmd");
+ Command::runAgainstRegistered(
+ interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions);
+ };
+
+ if (strcmp(ns, "inprog") == 0) {
+ upgradeToRealCommand("currentOp");
+ } else if (strcmp(ns, "killop") == 0) {
+ upgradeToRealCommand("killOp");
+ } else if (strcmp(ns, "unlock") == 0) {
+ reply.append("err", "can't do unlock through mongos");
+ } else {
+ warning() << "unknown sys command [" << ns << "]" << endl;
+ return false;
}
- void Strategy::commandOp( const string& db,
- const BSONObj& command,
- int options,
- const string& versionedNS,
- const BSONObj& targetingQuery,
- vector<CommandResult>* results )
- {
-
- QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options);
+ BSONObj x = reply.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return true;
+}
- ParallelSortClusteredCursor cursor( qSpec, CommandInfo( versionedNS, targetingQuery ) );
+void Strategy::commandOp(const string& db,
+ const BSONObj& command,
+ int options,
+ const string& versionedNS,
+ const BSONObj& targetingQuery,
+ vector<CommandResult>* results) {
+ QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options);
- // Initialize the cursor
- cursor.init();
+ ParallelSortClusteredCursor cursor(qSpec, CommandInfo(versionedNS, targetingQuery));
- set<ShardId> shardIds;
- cursor.getQueryShardIds(shardIds);
-
- for (const ShardId& shardId : shardIds) {
- CommandResult result;
- result.shardTargetId = shardId;
-
- string errMsg; // ignored, should never be invalid b/c an exception thrown earlier
- result.target =
- ConnectionString::parse( cursor.getShardCursor(shardId)->originalHost(),
- errMsg );
- result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned();
- results->push_back( result );
- }
+ // Initialize the cursor
+ cursor.init();
- }
+ set<ShardId> shardIds;
+ cursor.getQueryShardIds(shardIds);
- Status Strategy::commandOpWrite(const std::string& dbName,
- const BSONObj& command,
- BatchItemRef targetingBatchItem,
- std::vector<CommandResult>* results) {
+ for (const ShardId& shardId : shardIds) {
+ CommandResult result;
+ result.shardTargetId = shardId;
- // Note that this implementation will not handle targeting retries and does not completely
- // emulate write behavior
+ string errMsg; // ignored, should never be invalid b/c an exception thrown earlier
+ result.target =
+ ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost(), errMsg);
+ result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned();
+ results->push_back(result);
+ }
+}
- ChunkManagerTargeter targeter(NamespaceString(
- targetingBatchItem.getRequest()->getTargetingNS()));
- Status status = targeter.init();
+Status Strategy::commandOpWrite(const std::string& dbName,
+ const BSONObj& command,
+ BatchItemRef targetingBatchItem,
+ std::vector<CommandResult>* results) {
+ // Note that this implementation will not handle targeting retries and does not completely
+ // emulate write behavior
+
+ ChunkManagerTargeter targeter(
+ NamespaceString(targetingBatchItem.getRequest()->getTargetingNS()));
+ Status status = targeter.init();
+ if (!status.isOK())
+ return status;
+
+ OwnedPointerVector<ShardEndpoint> endpointsOwned;
+ vector<ShardEndpoint*>& endpoints = endpointsOwned.mutableVector();
+
+ if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
+ ShardEndpoint* endpoint;
+ Status status = targeter.targetInsert(targetingBatchItem.getDocument(), &endpoint);
+ if (!status.isOK())
+ return status;
+ endpoints.push_back(endpoint);
+ } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
+ Status status = targeter.targetUpdate(*targetingBatchItem.getUpdate(), &endpoints);
if (!status.isOK())
return status;
+ } else {
+ invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete);
+ Status status = targeter.targetDelete(*targetingBatchItem.getDelete(), &endpoints);
+ if (!status.isOK())
+ return status;
+ }
- OwnedPointerVector<ShardEndpoint> endpointsOwned;
- vector<ShardEndpoint*>& endpoints = endpointsOwned.mutableVector();
+ DBClientShardResolver resolver;
+ DBClientMultiCommand dispatcher;
- if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
- ShardEndpoint* endpoint;
- Status status = targeter.targetInsert(targetingBatchItem.getDocument(), &endpoint);
- if (!status.isOK())
- return status;
- endpoints.push_back(endpoint);
- }
- else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
- Status status = targeter.targetUpdate(*targetingBatchItem.getUpdate(), &endpoints);
- if (!status.isOK())
- return status;
- }
- else {
- invariant(targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete);
- Status status = targeter.targetDelete(*targetingBatchItem.getDelete(), &endpoints);
- if (!status.isOK())
- return status;
- }
+ // Assemble requests
+ for (vector<ShardEndpoint*>::const_iterator it = endpoints.begin(); it != endpoints.end();
+ ++it) {
+ const ShardEndpoint* endpoint = *it;
- DBClientShardResolver resolver;
- DBClientMultiCommand dispatcher;
+ ConnectionString host;
+ Status status = resolver.chooseWriteHost(endpoint->shardName, &host);
+ if (!status.isOK())
+ return status;
- // Assemble requests
- for (vector<ShardEndpoint*>::const_iterator it = endpoints.begin(); it != endpoints.end();
- ++it) {
+ RawBSONSerializable request(command);
+ dispatcher.addCommand(host, dbName, request);
+ }
- const ShardEndpoint* endpoint = *it;
+ // Errors reported when recv'ing responses
+ dispatcher.sendAll();
+ Status dispatchStatus = Status::OK();
- ConnectionString host;
- Status status = resolver.chooseWriteHost(endpoint->shardName, &host);
- if (!status.isOK())
- return status;
+ // Recv responses
+ while (dispatcher.numPending() > 0) {
+ ConnectionString host;
+ RawBSONSerializable response;
- RawBSONSerializable request(command);
- dispatcher.addCommand(host, dbName, request);
+ Status status = dispatcher.recvAny(&host, &response);
+ if (!status.isOK()) {
+ // We always need to recv() all the sent operations
+ dispatchStatus = status;
+ continue;
}
- // Errors reported when recv'ing responses
- dispatcher.sendAll();
- Status dispatchStatus = Status::OK();
-
- // Recv responses
- while (dispatcher.numPending() > 0) {
-
- ConnectionString host;
- RawBSONSerializable response;
-
- Status status = dispatcher.recvAny(&host, &response);
- if (!status.isOK()) {
- // We always need to recv() all the sent operations
- dispatchStatus = status;
- continue;
- }
-
- CommandResult result;
- result.target = host;
- {
- const auto shard = grid.shardRegistry()->getShard(host.toString());
- result.shardTargetId = shard->getId();
- }
- result.result = response.toBSON();
-
- results->push_back(result);
+ CommandResult result;
+ result.target = host;
+ {
+ const auto shard = grid.shardRegistry()->getShard(host.toString());
+ result.shardTargetId = shard->getId();
}
+ result.result = response.toBSON();
- return dispatchStatus;
+ results->push_back(result);
}
- Status Strategy::commandOpUnsharded(const std::string& db,
- const BSONObj& command,
- int options,
- const std::string& versionedNS,
- CommandResult* cmdResult) {
+ return dispatchStatus;
+}
- // Note that this implementation will not handle targeting retries and fails when the
- // sharding metadata is too stale
- auto status = grid.catalogCache()->getDatabase(db);
- if (!status.isOK()) {
- mongoutils::str::stream ss;
- ss << "Passthrough command failed: " << command.toString()
- << " on ns " << versionedNS << ". Caused by " << causedBy(status.getStatus());
- return Status(ErrorCodes::IllegalOperation, ss);
- }
+Status Strategy::commandOpUnsharded(const std::string& db,
+ const BSONObj& command,
+ int options,
+ const std::string& versionedNS,
+ CommandResult* cmdResult) {
+ // Note that this implementation will not handle targeting retries and fails when the
+ // sharding metadata is too stale
+ auto status = grid.catalogCache()->getDatabase(db);
+ if (!status.isOK()) {
+ mongoutils::str::stream ss;
+ ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS
+ << ". Caused by " << causedBy(status.getStatus());
+ return Status(ErrorCodes::IllegalOperation, ss);
+ }
- shared_ptr<DBConfig> conf = status.getValue();
- if (conf->isSharded(versionedNS)) {
- mongoutils::str::stream ss;
- ss << "Passthrough command failed: " << command.toString()
- << " on ns " << versionedNS << ". Cannot run on sharded namespace.";
- return Status(ErrorCodes::IllegalOperation, ss);
- }
+ shared_ptr<DBConfig> conf = status.getValue();
+ if (conf->isSharded(versionedNS)) {
+ mongoutils::str::stream ss;
+ ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS
+ << ". Cannot run on sharded namespace.";
+ return Status(ErrorCodes::IllegalOperation, ss);
+ }
- const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId());
+ const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId());
- BSONObj shardResult;
- try {
- ShardConnection conn(primaryShard->getConnString(), "");
-
- // TODO: this can throw a stale config when mongos is not up-to-date -- fix.
- if (!conn->runCommand(db, command, shardResult, options)) {
- conn.done();
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "Passthrough command failed: " << command
- << " on ns " << versionedNS
- << "; result: " << shardResult);
- }
+ BSONObj shardResult;
+ try {
+ ShardConnection conn(primaryShard->getConnString(), "");
+
+ // TODO: this can throw a stale config when mongos is not up-to-date -- fix.
+ if (!conn->runCommand(db, command, shardResult, options)) {
conn.done();
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "Passthrough command failed: " << command << " on ns "
+ << versionedNS << "; result: " << shardResult);
}
- catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- // Fill out the command result.
- cmdResult->shardTargetId = conf->getPrimaryId();
- cmdResult->result = shardResult;
- cmdResult->target = primaryShard->getConnString();
-
- return Status::OK();
+ conn.done();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- void Strategy::getMore( Request& r ) {
- Timer getMoreTimer;
-
- const char* ns = r.getns();
- const int ntoreturn = r.d().pullInt();
- const long long id = r.d().pullInt64();
+ // Fill out the command result.
+ cmdResult->shardTargetId = conf->getPrimaryId();
+ cmdResult->result = shardResult;
+ cmdResult->target = primaryShard->getConnString();
- // TODO: Handle stale config exceptions here from coll being dropped or sharded during op
- // for now has same semantics as legacy request
- const NamespaceString nss(ns);
- auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString());
- if (statusGetDb == ErrorCodes::DatabaseNotFound) {
- cursorCache.remove(id);
- replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0);
- return;
- }
+ return Status::OK();
+}
- uassertStatusOK(statusGetDb);
+void Strategy::getMore(Request& r) {
+ Timer getMoreTimer;
+
+ const char* ns = r.getns();
+ const int ntoreturn = r.d().pullInt();
+ const long long id = r.d().pullInt64();
+
+ // TODO: Handle stale config exceptions here from coll being dropped or sharded during op
+ // for now has same semantics as legacy request
+ const NamespaceString nss(ns);
+ auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString());
+ if (statusGetDb == ErrorCodes::DatabaseNotFound) {
+ cursorCache.remove(id);
+ replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0);
+ return;
+ }
- shared_ptr<DBConfig> config = statusGetDb.getValue();
+ uassertStatusOK(statusGetDb);
- ShardPtr primary;
- ChunkManagerPtr info;
- config->getChunkManagerOrPrimary( ns, info, primary );
+ shared_ptr<DBConfig> config = statusGetDb.getValue();
- //
- // TODO: Cleanup cursor cache, consolidate into single codepath
- //
- const string host = cursorCache.getRef(id);
- ShardedClientCursorPtr cursor = cursorCache.get( id );
- int cursorMaxTimeMS = cursorCache.getMaxTimeMS( id );
+ ShardPtr primary;
+ ChunkManagerPtr info;
+ config->getChunkManagerOrPrimary(ns, info, primary);
- // Cursor ids should not overlap between sharded and unsharded cursors
- massert( 17012, str::stream() << "duplicate sharded and unsharded cursor id "
- << id << " detected for " << ns
- << ", duplicated on host " << host,
- NULL == cursorCache.get( id ).get() || host.empty() );
+ //
+ // TODO: Cleanup cursor cache, consolidate into single codepath
+ //
+ const string host = cursorCache.getRef(id);
+ ShardedClientCursorPtr cursor = cursorCache.get(id);
+ int cursorMaxTimeMS = cursorCache.getMaxTimeMS(id);
- ClientBasic* client = ClientBasic::getCurrent();
- NamespaceString nsString(ns);
- AuthorizationSession* authSession = AuthorizationSession::get(client);
- Status status = authSession->checkAuthForGetMore( nsString, id );
- audit::logGetMoreAuthzCheck( client, nsString, id, status.code() );
- uassertStatusOK(status);
+ // Cursor ids should not overlap between sharded and unsharded cursors
+ massert(17012,
+ str::stream() << "duplicate sharded and unsharded cursor id " << id << " detected for "
+ << ns << ", duplicated on host " << host,
+ NULL == cursorCache.get(id).get() || host.empty());
- if( !host.empty() ){
+ ClientBasic* client = ClientBasic::getCurrent();
+ NamespaceString nsString(ns);
+ AuthorizationSession* authSession = AuthorizationSession::get(client);
+ Status status = authSession->checkAuthForGetMore(nsString, id);
+ audit::logGetMoreAuthzCheck(client, nsString, id, status.code());
+ uassertStatusOK(status);
- LOG(3) << "single getmore: " << ns << endl;
+ if (!host.empty()) {
+ LOG(3) << "single getmore: " << ns << endl;
- // we used ScopedDbConnection because we don't get about config versions
- // not deleting data is handled elsewhere
- // and we don't want to call setShardVersion
- ScopedDbConnection conn(host);
+ // we used ScopedDbConnection because we don't get about config versions
+ // not deleting data is handled elsewhere
+ // and we don't want to call setShardVersion
+ ScopedDbConnection conn(host);
- Message response;
- bool ok = conn->callRead( r.m() , response);
- uassert( 10204 , "dbgrid: getmore: error calling db", ok);
+ Message response;
+ bool ok = conn->callRead(r.m(), response);
+ uassert(10204, "dbgrid: getmore: error calling db", ok);
- bool hasMore = (response.singleData().getCursor() != 0);
+ bool hasMore = (response.singleData().getCursor() != 0);
- if ( !hasMore ) {
- cursorCache.removeRef( id );
- }
+ if (!hasMore) {
+ cursorCache.removeRef(id);
+ }
- r.reply( response , "" /*conn->getServerAddress() */ );
- conn.done();
- return;
+ r.reply(response, "" /*conn->getServerAddress() */);
+ conn.done();
+ return;
+ } else if (cursor) {
+ if (cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired) {
+ cursorCache.remove(id);
+ uasserted(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
}
- else if ( cursor ) {
- if ( cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired ) {
- cursorCache.remove( id );
- uasserted( ErrorCodes::ExceededTimeLimit, "operation exceeded time limit" );
- }
+ // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data?
+ BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
+ int docCount = 0;
+ const int startFrom = cursor->getTotalSent();
+ bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount);
- // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data?
- BufBuilder buffer( ShardedClientCursor::INIT_REPLY_BUFFER_SIZE );
- int docCount = 0;
- const int startFrom = cursor->getTotalSent();
- bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount);
-
- if ( hasMore ) {
- // still more data
- cursor->accessed();
-
- if ( cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit ) {
- // Update remaining amount of time in cursor cache.
- int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis();
- if ( cursorLeftoverMillis <= 0 ) {
- cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
- }
- cursorCache.updateMaxTimeMS( id, cursorLeftoverMillis );
+ if (hasMore) {
+ // still more data
+ cursor->accessed();
+
+ if (cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit) {
+ // Update remaining amount of time in cursor cache.
+ int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis();
+ if (cursorLeftoverMillis <= 0) {
+ cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
}
+ cursorCache.updateMaxTimeMS(id, cursorLeftoverMillis);
}
- else {
- // we've exhausted the cursor
- cursorCache.remove( id );
- }
-
- replyToQuery( 0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount,
- startFrom, hasMore ? cursor->getId() : 0 );
- return;
+ } else {
+ // we've exhausted the cursor
+ cursorCache.remove(id);
}
- else {
-
- LOG( 3 ) << "could not find cursor " << id << " in cache for " << ns << endl;
- replyToQuery( ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 );
- return;
- }
+ replyToQuery(0,
+ r.p(),
+ r.m(),
+ buffer.buf(),
+ buffer.len(),
+ docCount,
+ startFrom,
+ hasMore ? cursor->getId() : 0);
+ return;
+ } else {
+ LOG(3) << "could not find cursor " << id << " in cache for " << ns << endl;
+
+ replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0);
+ return;
}
+}
- void Strategy::writeOp( int op , Request& r ) {
-
- // make sure we have a last error
- dassert(&LastError::get(cc()));
-
- OwnedPointerVector<BatchedCommandRequest> requestsOwned;
- vector<BatchedCommandRequest*>& requests = requestsOwned.mutableVector();
+void Strategy::writeOp(int op, Request& r) {
+ // make sure we have a last error
+ dassert(&LastError::get(cc()));
- msgToBatchRequests( r.m(), &requests );
+ OwnedPointerVector<BatchedCommandRequest> requestsOwned;
+ vector<BatchedCommandRequest*>& requests = requestsOwned.mutableVector();
- for ( vector<BatchedCommandRequest*>::iterator it = requests.begin();
- it != requests.end(); ++it ) {
+ msgToBatchRequests(r.m(), &requests);
- // Multiple commands registered to last error as multiple requests
- if ( it != requests.begin() )
- LastError::get(cc()).startRequest();
+ for (vector<BatchedCommandRequest*>::iterator it = requests.begin(); it != requests.end();
+ ++it) {
+ // Multiple commands registered to last error as multiple requests
+ if (it != requests.begin())
+ LastError::get(cc()).startRequest();
- BatchedCommandRequest* request = *it;
+ BatchedCommandRequest* request = *it;
- // Adjust namespaces for command
- NamespaceString fullNS( request->getNS() );
- string cmdNS = fullNS.getCommandNS();
- // We only pass in collection name to command
- request->setNS( fullNS.coll() );
+ // Adjust namespaces for command
+ NamespaceString fullNS(request->getNS());
+ string cmdNS = fullNS.getCommandNS();
+ // We only pass in collection name to command
+ request->setNS(fullNS.coll());
- BSONObjBuilder builder;
- BSONObj requestBSON = request->toBSON();
+ BSONObjBuilder builder;
+ BSONObj requestBSON = request->toBSON();
- {
- // Disable the last error object for the duration of the write cmd
- LastError::Disabled disableLastError(&LastError::get(cc()));
- Command::runAgainstRegistered( cmdNS.c_str(), requestBSON, builder, 0 );
- }
+ {
+ // Disable the last error object for the duration of the write cmd
+ LastError::Disabled disableLastError(&LastError::get(cc()));
+ Command::runAgainstRegistered(cmdNS.c_str(), requestBSON, builder, 0);
+ }
- BatchedCommandResponse response;
- bool parsed = response.parseBSON( builder.done(), NULL );
- (void) parsed; // for compile
- dassert( parsed && response.isValid( NULL ) );
+ BatchedCommandResponse response;
+ bool parsed = response.parseBSON(builder.done(), NULL);
+ (void)parsed; // for compile
+ dassert(parsed && response.isValid(NULL));
- // Populate the lastError object based on the write response
- LastError::get(cc()).reset();
- bool hadError = batchErrorToLastError(*request, response, &LastError::get(cc()));
+ // Populate the lastError object based on the write response
+ LastError::get(cc()).reset();
+ bool hadError = batchErrorToLastError(*request, response, &LastError::get(cc()));
- // Check if this is an ordered batch and we had an error which should stop processing
- if ( request->getOrdered() && hadError )
- break;
- }
+ // Check if this is an ordered batch and we had an error which should stop processing
+ if (request->getOrdered() && hadError)
+ break;
}
-
+}
}