summaryrefslogtreecommitdiff
path: root/src/mongo/client/parallel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/parallel.cpp')
-rw-r--r--src/mongo/client/parallel.cpp2665
1 files changed, 1296 insertions, 1369 deletions
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp
index d764041468d..939cd253e33 100644
--- a/src/mongo/client/parallel.cpp
+++ b/src/mongo/client/parallel.cpp
@@ -52,1686 +52,1613 @@
namespace mongo {
- using std::shared_ptr;
- using std::endl;
- using std::list;
- using std::map;
- using std::set;
- using std::string;
- using std::stringstream;
- using std::vector;
-
- LabeledLevel pc( "pcursor", 2 );
-
- void ParallelSortClusteredCursor::init() {
- if ( _didInit )
- return;
- _didInit = true;
-
- if( ! _qSpec.isEmpty() ) {
- fullInit();
- }
- else {
- // You can only get here by using the legacy constructor
- // TODO: Eliminate this
- _oldInit();
- }
+using std::shared_ptr;
+using std::endl;
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+LabeledLevel pc("pcursor", 2);
+
+void ParallelSortClusteredCursor::init() {
+ if (_didInit)
+ return;
+ _didInit = true;
+
+ if (!_qSpec.isEmpty()) {
+ fullInit();
+ } else {
+ // You can only get here by using the legacy constructor
+ // TODO: Eliminate this
+ _oldInit();
}
+}
- string ParallelSortClusteredCursor::getNS() {
- if( ! _qSpec.isEmpty() ) return _qSpec.ns();
- return _ns;
- }
+string ParallelSortClusteredCursor::getNS() {
+ if (!_qSpec.isEmpty())
+ return _qSpec.ns();
+ return _ns;
+}
- /**
- * Throws a RecvStaleConfigException wrapping the stale error document in this cursor when the
- * ShardConfigStale flag is set or a command returns a SendStaleConfigCode error code.
- */
- static void throwCursorStale(DBClientCursor* cursor) {
- verify(cursor);
-
- if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) {
- BSONObj error;
- cursor->peekError(&error);
- throw RecvStaleConfigException("query returned a stale config error", error);
- }
+/**
+ * Throws a RecvStaleConfigException wrapping the stale error document in this cursor when the
+ * ShardConfigStale flag is set or a command returns a SendStaleConfigCode error code.
+ */
+static void throwCursorStale(DBClientCursor* cursor) {
+ verify(cursor);
- if (NamespaceString(cursor->getns()).isCommand()) {
- // Commands that care about versioning (like the count or geoNear command) sometimes
- // return with the stale config error code, but don't set the ShardConfigStale result
- // flag on the cursor.
- // TODO: Standardize stale config reporting.
- BSONObj res = cursor->peekFirst();
- if (res.hasField("code") && res["code"].Number() == SendStaleConfigCode) {
- throw RecvStaleConfigException("command returned a stale config error", res);
- }
+ if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) {
+ BSONObj error;
+ cursor->peekError(&error);
+ throw RecvStaleConfigException("query returned a stale config error", error);
+ }
+
+ if (NamespaceString(cursor->getns()).isCommand()) {
+ // Commands that care about versioning (like the count or geoNear command) sometimes
+ // return with the stale config error code, but don't set the ShardConfigStale result
+ // flag on the cursor.
+ // TODO: Standardize stale config reporting.
+ BSONObj res = cursor->peekFirst();
+ if (res.hasField("code") && res["code"].Number() == SendStaleConfigCode) {
+ throw RecvStaleConfigException("command returned a stale config error", res);
}
}
+}
- /**
- * Throws an exception wrapping the error document in this cursor when the error flag is set.
- */
- static void throwCursorError(DBClientCursor* cursor) {
- verify(cursor);
+/**
+ * Throws an exception wrapping the error document in this cursor when the error flag is set.
+ */
+static void throwCursorError(DBClientCursor* cursor) {
+ verify(cursor);
- if (cursor->hasResultFlag(ResultFlag_ErrSet)) {
- BSONObj o = cursor->next();
- throw UserException(o["code"].numberInt(), o["$err"].str());
- }
+ if (cursor->hasResultFlag(ResultFlag_ErrSet)) {
+ BSONObj o = cursor->next();
+ throw UserException(o["code"].numberInt(), o["$err"].str());
}
+}
- void ParallelSortClusteredCursor::explain(BSONObjBuilder& b) {
- // Note: by default we filter out allPlans and oldPlan in the shell's
- // explain() function. If you add any recursive structures, make sure to
- // edit the JS to make sure everything gets filtered.
-
- // Return single shard output if we're versioned but not sharded, or
- // if we specified only a single shard
- // TODO: We should really make this simpler - all queries via mongos
- // *always* get the same explain format
- if (!isSharded()) {
- map<string,list<BSONObj> > out;
- _explain( out );
- verify( out.size() == 1 );
- list<BSONObj>& l = out.begin()->second;
- verify( l.size() == 1 );
- b.appendElements( *(l.begin()) );
- return;
- }
+void ParallelSortClusteredCursor::explain(BSONObjBuilder& b) {
+ // Note: by default we filter out allPlans and oldPlan in the shell's
+ // explain() function. If you add any recursive structures, make sure to
+ // edit the JS to make sure everything gets filtered.
+
+ // Return single shard output if we're versioned but not sharded, or
+ // if we specified only a single shard
+ // TODO: We should really make this simpler - all queries via mongos
+ // *always* get the same explain format
+ if (!isSharded()) {
+ map<string, list<BSONObj>> out;
+ _explain(out);
+ verify(out.size() == 1);
+ list<BSONObj>& l = out.begin()->second;
+ verify(l.size() == 1);
+ b.appendElements(*(l.begin()));
+ return;
+ }
- b.append( "clusteredType" , type() );
+ b.append("clusteredType", type());
- string cursorType;
- BSONObj indexBounds;
- BSONObj oldPlan;
+ string cursorType;
+ BSONObj indexBounds;
+ BSONObj oldPlan;
- long long millis = 0;
- double numExplains = 0;
+ long long millis = 0;
+ double numExplains = 0;
- long long nReturned = 0;
- long long keysExamined = 0;
- long long docsExamined = 0;
+ long long nReturned = 0;
+ long long keysExamined = 0;
+ long long docsExamined = 0;
- map<string,list<BSONObj> > out;
- {
- _explain( out );
-
- BSONObjBuilder x( b.subobjStart( "shards" ) );
- for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ) {
- const ShardId& shardId = i->first;
- list<BSONObj> l = i->second;
- BSONArrayBuilder y(x.subarrayStart(shardId));
- for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ) {
- BSONObj temp = *j;
-
- // If appending the next output from the shard is going to make the BSON
- // too large, then don't add it. We make sure the BSON doesn't get bigger
- // than the allowable "user size" for a BSONObj. This leaves a little bit
- // of extra space which mongos can use to add extra data.
- if ((x.len() + temp.objsize()) > BSONObjMaxUserSize) {
- y.append(BSON("warning" <<
- "shard output omitted due to nearing 16 MB limit"));
- break;
- }
+ map<string, list<BSONObj>> out;
+ {
+ _explain(out);
- y.append( temp );
+ BSONObjBuilder x(b.subobjStart("shards"));
+ for (map<string, list<BSONObj>>::iterator i = out.begin(); i != out.end(); ++i) {
+ const ShardId& shardId = i->first;
+ list<BSONObj> l = i->second;
+ BSONArrayBuilder y(x.subarrayStart(shardId));
+ for (list<BSONObj>::iterator j = l.begin(); j != l.end(); ++j) {
+ BSONObj temp = *j;
+
+ // If appending the next output from the shard is going to make the BSON
+ // too large, then don't add it. We make sure the BSON doesn't get bigger
+ // than the allowable "user size" for a BSONObj. This leaves a little bit
+ // of extra space which mongos can use to add extra data.
+ if ((x.len() + temp.objsize()) > BSONObjMaxUserSize) {
+ y.append(BSON("warning"
+ << "shard output omitted due to nearing 16 MB limit"));
+ break;
+ }
- if (temp.hasField("executionStats")) {
- // Here we assume that the shard gave us back explain 2.0 style output.
- BSONObj execStats = temp["executionStats"].Obj();
- if (execStats.hasField("nReturned")) {
- nReturned += execStats["nReturned"].numberLong();
- }
- if (execStats.hasField("totalKeysExamined")) {
- keysExamined += execStats["totalKeysExamined"].numberLong();
- }
- if (execStats.hasField("totalDocsExamined")) {
- docsExamined += execStats["totalDocsExamined"].numberLong();
- }
- if (execStats.hasField("executionTimeMillis")) {
- millis += execStats["executionTimeMillis"].numberLong();
- }
+ y.append(temp);
+
+ if (temp.hasField("executionStats")) {
+ // Here we assume that the shard gave us back explain 2.0 style output.
+ BSONObj execStats = temp["executionStats"].Obj();
+ if (execStats.hasField("nReturned")) {
+ nReturned += execStats["nReturned"].numberLong();
}
- else {
- // Here we assume that the shard gave us back explain 1.0 style output.
- if (temp.hasField("n")) {
- nReturned += temp["n"].numberLong();
- }
- if (temp.hasField("nscanned")) {
- keysExamined += temp["nscanned"].numberLong();
- }
- if (temp.hasField("nscannedObjects")) {
- docsExamined += temp["nscannedObjects"].numberLong();
- }
- if (temp.hasField("millis")) {
- millis += temp["millis"].numberLong();
- }
- if (String == temp["cursor"].type()) {
- if (cursorType.empty()) {
- cursorType = temp["cursor"].String();
- }
- else if (cursorType != temp["cursor"].String()) {
- cursorType = "multiple";
- }
- }
- if (Object == temp["indexBounds"].type()) {
- indexBounds = temp["indexBounds"].Obj();
- }
- if (Object == temp["oldPlan"].type()) {
- oldPlan = temp["oldPlan"].Obj();
+ if (execStats.hasField("totalKeysExamined")) {
+ keysExamined += execStats["totalKeysExamined"].numberLong();
+ }
+ if (execStats.hasField("totalDocsExamined")) {
+ docsExamined += execStats["totalDocsExamined"].numberLong();
+ }
+ if (execStats.hasField("executionTimeMillis")) {
+ millis += execStats["executionTimeMillis"].numberLong();
+ }
+ } else {
+ // Here we assume that the shard gave us back explain 1.0 style output.
+ if (temp.hasField("n")) {
+ nReturned += temp["n"].numberLong();
+ }
+ if (temp.hasField("nscanned")) {
+ keysExamined += temp["nscanned"].numberLong();
+ }
+ if (temp.hasField("nscannedObjects")) {
+ docsExamined += temp["nscannedObjects"].numberLong();
+ }
+ if (temp.hasField("millis")) {
+ millis += temp["millis"].numberLong();
+ }
+ if (String == temp["cursor"].type()) {
+ if (cursorType.empty()) {
+ cursorType = temp["cursor"].String();
+ } else if (cursorType != temp["cursor"].String()) {
+ cursorType = "multiple";
}
}
-
- numExplains++;
+ if (Object == temp["indexBounds"].type()) {
+ indexBounds = temp["indexBounds"].Obj();
+ }
+ if (Object == temp["oldPlan"].type()) {
+ oldPlan = temp["oldPlan"].Obj();
+ }
}
- y.done();
- }
- x.done();
- }
-
- if ( !cursorType.empty() ) {
- b.append( "cursor" , cursorType );
- }
- b.appendNumber( "n" , nReturned );
- b.appendNumber( "nscanned" , keysExamined );
- b.appendNumber( "nscannedObjects" , docsExamined );
-
- b.appendNumber( "millisShardTotal" , millis );
- b.append( "millisShardAvg" ,
- numExplains ? static_cast<int>( static_cast<double>(millis) / numExplains )
- : 0 );
- b.append( "numQueries" , (int)numExplains );
- b.append( "numShards" , (int)out.size() );
-
- if ( out.size() == 1 ) {
- b.append( "indexBounds" , indexBounds );
- if ( ! oldPlan.isEmpty() ) {
- // this is to stay in compliance with mongod behavior
- // we should make this cleaner, i.e. {} == nothing
- b.append( "oldPlan" , oldPlan );
+ numExplains++;
}
+ y.done();
}
- else {
- // TODO: this is lame...
- }
-
+ x.done();
}
- // -------- ParallelSortClusteredCursor -----------
-
- ParallelSortClusteredCursor::ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo )
- : _qSpec( qSpec ), _cInfo( cInfo ), _totalTries( 0 )
- {
- _done = false;
- _didInit = false;
+ if (!cursorType.empty()) {
+ b.append("cursor", cursorType);
+ }
- _finishCons();
+ b.appendNumber("n", nReturned);
+ b.appendNumber("nscanned", keysExamined);
+ b.appendNumber("nscannedObjects", docsExamined);
+
+ b.appendNumber("millisShardTotal", millis);
+ b.append("millisShardAvg",
+ numExplains ? static_cast<int>(static_cast<double>(millis) / numExplains) : 0);
+ b.append("numQueries", (int)numExplains);
+ b.append("numShards", (int)out.size());
+
+ if (out.size() == 1) {
+ b.append("indexBounds", indexBounds);
+ if (!oldPlan.isEmpty()) {
+ // this is to stay in compliance with mongod behavior
+ // we should make this cleaner, i.e. {} == nothing
+ b.append("oldPlan", oldPlan);
+ }
+ } else {
+ // TODO: this is lame...
}
+}
- // LEGACY Constructor
- ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<string>& servers , const string& ns ,
- const Query& q ,
- int options , const BSONObj& fields )
- : _servers( servers ) {
+// -------- ParallelSortClusteredCursor -----------
- _sortKey = q.getSort().copy();
- _needToSkip = 0;
+ParallelSortClusteredCursor::ParallelSortClusteredCursor(const QuerySpec& qSpec,
+ const CommandInfo& cInfo)
+ : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0) {
+ _done = false;
+ _didInit = false;
- _done = false;
- _didInit = false;
+ _finishCons();
+}
- // Populate legacy fields
- _ns = ns;
- _query = q.obj.getOwned();
- _options = options;
- _fields = fields.getOwned();
- _batchSize = 0;
+// LEGACY Constructor
+ParallelSortClusteredCursor::ParallelSortClusteredCursor(const set<string>& servers,
+ const string& ns,
+ const Query& q,
+ int options,
+ const BSONObj& fields)
+ : _servers(servers) {
+ _sortKey = q.getSort().copy();
+ _needToSkip = 0;
+
+ _done = false;
+ _didInit = false;
+
+ // Populate legacy fields
+ _ns = ns;
+ _query = q.obj.getOwned();
+ _options = options;
+ _fields = fields.getOwned();
+ _batchSize = 0;
+
+ _finishCons();
+}
- _finishCons();
- }
+void ParallelSortClusteredCursor::_finishCons() {
+ _numServers = _servers.size();
+ _lastFrom = 0;
+ _cursors = 0;
- void ParallelSortClusteredCursor::_finishCons() {
- _numServers = _servers.size();
- _lastFrom = 0;
+ if (!_qSpec.isEmpty()) {
+ _needToSkip = _qSpec.ntoskip();
_cursors = 0;
+ _sortKey = _qSpec.sort();
+ _fields = _qSpec.fields();
+ }
- if( ! _qSpec.isEmpty() ){
- _needToSkip = _qSpec.ntoskip();
- _cursors = 0;
- _sortKey = _qSpec.sort();
- _fields = _qSpec.fields();
- }
-
- // Partition sort key fields into (a) text meta fields and (b) all other fields.
- set<string> textMetaSortKeyFields;
- set<string> normalSortKeyFields;
-
- // Transform _sortKey fields {a:{$meta:"textScore"}} into {a:-1}, in order to apply the
- // merge sort for text metadata in the correct direction.
- BSONObjBuilder transformedSortKeyBuilder;
-
- BSONObjIterator sortKeyIt( _sortKey );
- while ( sortKeyIt.more() ) {
- BSONElement e = sortKeyIt.next();
- if ( LiteParsedQuery::isTextScoreMeta( e ) ) {
- textMetaSortKeyFields.insert( e.fieldName() );
- transformedSortKeyBuilder.append( e.fieldName(), -1 );
- }
- else {
- normalSortKeyFields.insert( e.fieldName() );
- transformedSortKeyBuilder.append( e );
- }
+ // Partition sort key fields into (a) text meta fields and (b) all other fields.
+ set<string> textMetaSortKeyFields;
+ set<string> normalSortKeyFields;
+
+ // Transform _sortKey fields {a:{$meta:"textScore"}} into {a:-1}, in order to apply the
+ // merge sort for text metadata in the correct direction.
+ BSONObjBuilder transformedSortKeyBuilder;
+
+ BSONObjIterator sortKeyIt(_sortKey);
+ while (sortKeyIt.more()) {
+ BSONElement e = sortKeyIt.next();
+ if (LiteParsedQuery::isTextScoreMeta(e)) {
+ textMetaSortKeyFields.insert(e.fieldName());
+ transformedSortKeyBuilder.append(e.fieldName(), -1);
+ } else {
+ normalSortKeyFields.insert(e.fieldName());
+ transformedSortKeyBuilder.append(e);
}
- _sortKey = transformedSortKeyBuilder.obj();
-
- // Verify that that all text metadata sort fields are in the projection. For all other sort
- // fields, copy them into the projection if they are missing (and if projection is
- // negative).
- if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ) {
-
- BSONObjBuilder b;
- bool isNegative = false;
- {
- BSONObjIterator i( _fields );
- while ( i.more() ) {
- BSONElement e = i.next();
- b.append( e );
-
- string fieldName = e.fieldName();
+ }
+ _sortKey = transformedSortKeyBuilder.obj();
- if ( LiteParsedQuery::isTextScoreMeta( e ) ) {
- textMetaSortKeyFields.erase( fieldName );
- }
- else {
- // exact field
- bool found = normalSortKeyFields.erase( fieldName );
-
- // subfields
- set<string>::const_iterator begin =
- normalSortKeyFields.lower_bound( fieldName + ".\x00" );
- set<string>::const_iterator end =
- normalSortKeyFields.lower_bound( fieldName + ".\xFF" );
- normalSortKeyFields.erase( begin, end );
-
- if ( ! e.trueValue() ) {
- uassert( 13431,
- "have to have sort key in projection and removing it",
- !found && begin == end );
- }
- else if ( !e.isABSONObj() ) {
- isNegative = true;
- }
+ // Verify that that all text metadata sort fields are in the projection. For all other sort
+ // fields, copy them into the projection if they are missing (and if projection is
+ // negative).
+ if (!_sortKey.isEmpty() && !_fields.isEmpty()) {
+ BSONObjBuilder b;
+ bool isNegative = false;
+ {
+ BSONObjIterator i(_fields);
+ while (i.more()) {
+ BSONElement e = i.next();
+ b.append(e);
+
+ string fieldName = e.fieldName();
+
+ if (LiteParsedQuery::isTextScoreMeta(e)) {
+ textMetaSortKeyFields.erase(fieldName);
+ } else {
+ // exact field
+ bool found = normalSortKeyFields.erase(fieldName);
+
+ // subfields
+ set<string>::const_iterator begin =
+ normalSortKeyFields.lower_bound(fieldName + ".\x00");
+ set<string>::const_iterator end =
+ normalSortKeyFields.lower_bound(fieldName + ".\xFF");
+ normalSortKeyFields.erase(begin, end);
+
+ if (!e.trueValue()) {
+ uassert(13431,
+ "have to have sort key in projection and removing it",
+ !found && begin == end);
+ } else if (!e.isABSONObj()) {
+ isNegative = true;
}
}
}
+ }
- if ( isNegative ) {
- for ( set<string>::const_iterator it( normalSortKeyFields.begin() ),
- end( normalSortKeyFields.end() );
- it != end;
- ++it ) {
- b.append( *it, 1 );
- }
+ if (isNegative) {
+ for (set<string>::const_iterator it(normalSortKeyFields.begin()),
+ end(normalSortKeyFields.end());
+ it != end;
+ ++it) {
+ b.append(*it, 1);
}
-
- _fields = b.obj();
}
- if( ! _qSpec.isEmpty() ){
- _qSpec.setFields( _fields );
- }
+ _fields = b.obj();
+ }
- uassert( 17306,
- "have to have all text meta sort keys in projection",
- textMetaSortKeyFields.empty() );
+ if (!_qSpec.isEmpty()) {
+ _qSpec.setFields(_fields);
}
- void ParallelConnectionMetadata::cleanup( bool full ){
+ uassert(
+ 17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty());
+}
- if( full || errored ) retryNext = false;
+void ParallelConnectionMetadata::cleanup(bool full) {
+ if (full || errored)
+ retryNext = false;
- if( ! retryNext && pcState ){
- if (initialized && !errored) {
- verify( pcState->cursor );
- verify( pcState->conn );
+ if (!retryNext && pcState) {
+ if (initialized && !errored) {
+ verify(pcState->cursor);
+ verify(pcState->conn);
- if( ! finished && pcState->conn->ok() ){
- try{
- // Complete the call if only halfway done
- bool retry = false;
- pcState->cursor->initLazyFinish( retry );
- }
- catch( std::exception& ){
- warning() << "exception closing cursor" << endl;
- }
- catch( ... ){
- warning() << "unknown exception closing cursor" << endl;
- }
+ if (!finished && pcState->conn->ok()) {
+ try {
+ // Complete the call if only halfway done
+ bool retry = false;
+ pcState->cursor->initLazyFinish(retry);
+ } catch (std::exception&) {
+ warning() << "exception closing cursor" << endl;
+ } catch (...) {
+ warning() << "unknown exception closing cursor" << endl;
}
}
-
- // Double-check conn is closed
- if( pcState->conn ){
- pcState->conn->done();
- }
-
- pcState.reset();
}
- else verify( finished || ! initialized );
- initialized = false;
- finished = false;
- completed = false;
- errored = false;
- }
-
-
-
- BSONObj ParallelConnectionState::toBSON() const {
-
- BSONObj cursorPeek = BSON( "no cursor" << "" );
- if( cursor ){
- vector<BSONObj> v;
- cursor->peek( v, 1 );
- if( v.size() == 0 ) cursorPeek = BSON( "no data" << "" );
- else cursorPeek = BSON( "" << v[0] );
+ // Double-check conn is closed
+ if (pcState->conn) {
+ pcState->conn->done();
}
- BSONObj stateObj =
- BSON( "conn" << ( conn ? ( conn->ok() ? conn->conn().toString() : "(done)" ) : "" ) <<
- "vinfo" << ( manager ? ( str::stream() << manager->getns() << " @ " << manager->getVersion().toString() ) :
- primary->toString() ) );
-
- // Append cursor data if exists
- BSONObjBuilder stateB;
- stateB.appendElements( stateObj );
- if( ! cursor ) stateB.append( "cursor", "(none)" );
- else {
- vector<BSONObj> v;
- cursor->peek( v, 1 );
- if( v.size() == 0 ) stateB.append( "cursor", "(empty)" );
- else stateB.append( "cursor", v[0] );
- }
+ pcState.reset();
+ } else
+ verify(finished || !initialized);
+
+ initialized = false;
+ finished = false;
+ completed = false;
+ errored = false;
+}
- stateB.append( "count", count );
- stateB.append( "done", done );
- return stateB.obj().getOwned();
+BSONObj ParallelConnectionState::toBSON() const {
+ BSONObj cursorPeek = BSON("no cursor"
+ << "");
+ if (cursor) {
+ vector<BSONObj> v;
+ cursor->peek(v, 1);
+ if (v.size() == 0)
+ cursorPeek = BSON("no data"
+ << "");
+ else
+ cursorPeek = BSON("" << v[0]);
}
- BSONObj ParallelConnectionMetadata::toBSON() const {
- return BSON( "state" << ( pcState ? pcState->toBSON() : BSONObj() ) <<
- "retryNext" << retryNext <<
- "init" << initialized <<
- "finish" << finished <<
- "errored" << errored );
+ BSONObj stateObj =
+ BSON("conn" << (conn ? (conn->ok() ? conn->conn().toString() : "(done)") : "") << "vinfo"
+ << (manager ? (str::stream() << manager->getns() << " @ "
+ << manager->getVersion().toString())
+ : primary->toString()));
+
+ // Append cursor data if exists
+ BSONObjBuilder stateB;
+ stateB.appendElements(stateObj);
+ if (!cursor)
+ stateB.append("cursor", "(none)");
+ else {
+ vector<BSONObj> v;
+ cursor->peek(v, 1);
+ if (v.size() == 0)
+ stateB.append("cursor", "(empty)");
+ else
+ stateB.append("cursor", v[0]);
}
- BSONObj ParallelSortClusteredCursor::toBSON() const {
+ stateB.append("count", count);
+ stateB.append("done", done);
- BSONObjBuilder b;
+ return stateB.obj().getOwned();
+}
- b.append( "tries", _totalTries );
+BSONObj ParallelConnectionMetadata::toBSON() const {
+ return BSON("state" << (pcState ? pcState->toBSON() : BSONObj()) << "retryNext" << retryNext
+ << "init" << initialized << "finish" << finished << "errored" << errored);
+}
- {
- BSONObjBuilder bb;
- for (map<ShardId, PCMData>::const_iterator i = _cursorMap.begin(),
- end = _cursorMap.end();
- i != end;
- ++i) {
- const auto shard = grid.shardRegistry()->getShard(i->first);
- if (!shard) {
- continue;
- }
+BSONObj ParallelSortClusteredCursor::toBSON() const {
+ BSONObjBuilder b;
- bb.append(shard->toString(), i->second.toBSON());
- }
- b.append( "cursors", bb.obj().getOwned() );
- }
+ b.append("tries", _totalTries);
- {
- BSONObjBuilder bb;
- for( map< string, int >::const_iterator i = _staleNSMap.begin(), end = _staleNSMap.end(); i != end; ++i ){
- bb.append( i->first, i->second );
+ {
+ BSONObjBuilder bb;
+ for (map<ShardId, PCMData>::const_iterator i = _cursorMap.begin(), end = _cursorMap.end();
+ i != end;
+ ++i) {
+ const auto shard = grid.shardRegistry()->getShard(i->first);
+ if (!shard) {
+ continue;
}
- b.append( "staleTries", bb.obj().getOwned() );
- }
- return b.obj().getOwned();
+ bb.append(shard->toString(), i->second.toBSON());
+ }
+ b.append("cursors", bb.obj().getOwned());
}
- string ParallelSortClusteredCursor::toString() const {
- return str::stream() << "PCursor : " << toBSON();
+ {
+ BSONObjBuilder bb;
+ for (map<string, int>::const_iterator i = _staleNSMap.begin(), end = _staleNSMap.end();
+ i != end;
+ ++i) {
+ bb.append(i->first, i->second);
+ }
+ b.append("staleTries", bb.obj().getOwned());
}
- void ParallelSortClusteredCursor::fullInit(){
- startInit();
- finishInit();
- }
+ return b.obj().getOwned();
+}
- void ParallelSortClusteredCursor::_markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload ){
+string ParallelSortClusteredCursor::toString() const {
+ return str::stream() << "PCursor : " << toBSON();
+}
- fullReload = e.requiresFullReload();
+void ParallelSortClusteredCursor::fullInit() {
+ startInit();
+ finishInit();
+}
- if( _staleNSMap.find( staleNS ) == _staleNSMap.end() ) _staleNSMap[ staleNS ] = 1;
+void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS,
+ const StaleConfigException& e,
+ bool& forceReload,
+ bool& fullReload) {
+ fullReload = e.requiresFullReload();
- int tries = ++_staleNSMap[ staleNS ];
+ if (_staleNSMap.find(staleNS) == _staleNSMap.end())
+ _staleNSMap[staleNS] = 1;
- if (tries >= 5) {
- throw SendStaleConfigException(staleNS,
- str::stream()
- << "too many retries of stale version info",
- e.getVersionReceived(),
- e.getVersionWanted());
- }
+ int tries = ++_staleNSMap[staleNS];
- forceReload = tries > 2;
+ if (tries >= 5) {
+ throw SendStaleConfigException(staleNS,
+ str::stream() << "too many retries of stale version info",
+ e.getVersionReceived(),
+ e.getVersionWanted());
}
- void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS,
- bool forceReload,
- bool fullReload) {
-
- auto status = grid.catalogCache()->getDatabase(staleNS.db().toString());
- if (!status.isOK()) {
- warning() << "cannot reload database info for stale namespace " << staleNS;
- return;
- }
+ forceReload = tries > 2;
+}
- shared_ptr<DBConfig> config = status.getValue();
+void ParallelSortClusteredCursor::_handleStaleNS(const NamespaceString& staleNS,
+ bool forceReload,
+ bool fullReload) {
+ auto status = grid.catalogCache()->getDatabase(staleNS.db().toString());
+ if (!status.isOK()) {
+ warning() << "cannot reload database info for stale namespace " << staleNS;
+ return;
+ }
- // Reload db if needed, make sure it works
- if (fullReload && !config->reload()) {
- // We didn't find the db after reload, the db may have been dropped, reset this ptr
- config.reset();
- }
+ shared_ptr<DBConfig> config = status.getValue();
- if (!config) {
- warning() << "cannot reload database info for stale namespace " << staleNS;
- }
- else {
- // Reload chunk manager, potentially forcing the namespace
- config->getChunkManagerIfExists(staleNS, true, forceReload);
- }
+ // Reload db if needed, make sure it works
+ if (fullReload && !config->reload()) {
+ // We didn't find the db after reload, the db may have been dropped, reset this ptr
+ config.reset();
}
- void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
- PCStatePtr state,
- const ShardId& shardId,
- std::shared_ptr<Shard> primary,
- const NamespaceString& ns,
- const string& vinfo,
- ChunkManagerPtr manager ) {
+ if (!config) {
+ warning() << "cannot reload database info for stale namespace " << staleNS;
+ } else {
+ // Reload chunk manager, potentially forcing the namespace
+ config->getChunkManagerIfExists(staleNS, true, forceReload);
+ }
+}
- if ( manager ) {
- state->manager = manager;
- }
- else if ( primary ) {
- state->primary = primary;
- }
+void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(PCStatePtr state,
+ const ShardId& shardId,
+ std::shared_ptr<Shard> primary,
+ const NamespaceString& ns,
+ const string& vinfo,
+ ChunkManagerPtr manager) {
+ if (manager) {
+ state->manager = manager;
+ } else if (primary) {
+ state->primary = primary;
+ }
- verify(!primary || shardId == primary->getId());
+ verify(!primary || shardId == primary->getId());
- // Setup conn
- if (!state->conn) {
- const auto shard = grid.shardRegistry()->getShard(shardId);
- state->conn.reset(new ShardConnection(shard->getConnString(), ns, manager));
- }
+ // Setup conn
+ if (!state->conn) {
+ const auto shard = grid.shardRegistry()->getShard(shardId);
+ state->conn.reset(new ShardConnection(shard->getConnString(), ns, manager));
+ }
- const DBClientBase* rawConn = state->conn->getRawConn();
- bool allowShardVersionFailure =
- rawConn->type() == ConnectionString::SET &&
- DBClientReplicaSet::isSecondaryQuery( _qSpec.ns(), _qSpec.query(), _qSpec.options() );
- bool connIsDown = rawConn->isFailed();
- if (allowShardVersionFailure && !connIsDown) {
- // If the replica set connection believes that it has a valid primary that is up,
- // confirm that the replica set monitor agrees that the suspected primary is indeed up.
- const DBClientReplicaSet* replConn = dynamic_cast<const DBClientReplicaSet*>(rawConn);
- invariant(replConn);
- ReplicaSetMonitorPtr rsMonitor = ReplicaSetMonitor::get(replConn->getSetName());
- if (!rsMonitor->isHostUp(replConn->getSuspectedPrimaryHostAndPort())) {
- connIsDown = true;
- }
+ const DBClientBase* rawConn = state->conn->getRawConn();
+ bool allowShardVersionFailure = rawConn->type() == ConnectionString::SET &&
+ DBClientReplicaSet::isSecondaryQuery(_qSpec.ns(), _qSpec.query(), _qSpec.options());
+ bool connIsDown = rawConn->isFailed();
+ if (allowShardVersionFailure && !connIsDown) {
+ // If the replica set connection believes that it has a valid primary that is up,
+ // confirm that the replica set monitor agrees that the suspected primary is indeed up.
+ const DBClientReplicaSet* replConn = dynamic_cast<const DBClientReplicaSet*>(rawConn);
+ invariant(replConn);
+ ReplicaSetMonitorPtr rsMonitor = ReplicaSetMonitor::get(replConn->getSetName());
+ if (!rsMonitor->isHostUp(replConn->getSuspectedPrimaryHostAndPort())) {
+ connIsDown = true;
}
+ }
- if (allowShardVersionFailure && connIsDown) {
- // If we're doing a secondary-allowed query and the primary is down, don't attempt to
- // set the shard version.
+ if (allowShardVersionFailure && connIsDown) {
+ // If we're doing a secondary-allowed query and the primary is down, don't attempt to
+ // set the shard version.
- state->conn->donotCheckVersion();
+ state->conn->donotCheckVersion();
- // A side effect of this short circuiting is the mongos will not be able figure out that
- // the primary is now up on it's own and has to rely on other threads to refresh node
- // states.
+ // A side effect of this short circuiting is the mongos will not be able figure out that
+ // the primary is now up on it's own and has to rely on other threads to refresh node
+ // states.
- OCCASIONALLY {
- const DBClientReplicaSet* repl = dynamic_cast<const DBClientReplicaSet*>( rawConn );
- dassert(repl);
- warning() << "Primary for " << repl->getServerAddress()
- << " was down before, bypassing setShardVersion."
- << " The local replica set view and targeting may be stale." << endl;
- }
+ OCCASIONALLY {
+ const DBClientReplicaSet* repl = dynamic_cast<const DBClientReplicaSet*>(rawConn);
+ dassert(repl);
+ warning() << "Primary for " << repl->getServerAddress()
+ << " was down before, bypassing setShardVersion."
+ << " The local replica set view and targeting may be stale." << endl;
}
- else {
- try {
- if ( state->conn->setVersion() ) {
- // It's actually okay if we set the version here, since either the
- // manager will be verified as compatible, or if the manager doesn't
- // exist, we don't care about version consistency
- LOG( pc ) << "needed to set remote version on connection to value "
- << "compatible with " << vinfo << endl;
- }
+ } else {
+ try {
+ if (state->conn->setVersion()) {
+ // It's actually okay if we set the version here, since either the
+ // manager will be verified as compatible, or if the manager doesn't
+ // exist, we don't care about version consistency
+ LOG(pc) << "needed to set remote version on connection to value "
+ << "compatible with " << vinfo << endl;
}
- catch ( const DBException& ) {
- if ( allowShardVersionFailure ) {
-
- // It's okay if we don't set the version when talking to a secondary, we can
- // be stale in any case.
-
- OCCASIONALLY {
- const DBClientReplicaSet* repl =
- dynamic_cast<const DBClientReplicaSet*>( state->conn->getRawConn() );
- dassert(repl);
- warning() << "Cannot contact primary for " << repl->getServerAddress()
- << " to check shard version."
- << " The local replica set view and targeting may be stale."
- << endl;
- }
- }
- else {
- throw;
+ } catch (const DBException&) {
+ if (allowShardVersionFailure) {
+ // It's okay if we don't set the version when talking to a secondary, we can
+ // be stale in any case.
+
+ OCCASIONALLY {
+ const DBClientReplicaSet* repl =
+ dynamic_cast<const DBClientReplicaSet*>(state->conn->getRawConn());
+ dassert(repl);
+ warning() << "Cannot contact primary for " << repl->getServerAddress()
+ << " to check shard version."
+ << " The local replica set view and targeting may be stale." << endl;
}
+ } else {
+ throw;
}
}
}
-
- void ParallelSortClusteredCursor::startInit() {
- const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
- const NamespaceString ns(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns());
+}
- shared_ptr<ChunkManager> manager;
- shared_ptr<Shard> primary;
+void ParallelSortClusteredCursor::startInit() {
+ const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
+ const NamespaceString ns(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns());
- string prefix;
- if (MONGO_unlikely(shouldLog(pc))) {
- if( _totalTries > 0 ) {
- prefix = str::stream() << "retrying (" << _totalTries << " tries)";
- }
- else {
- prefix = "creating";
- }
+ shared_ptr<ChunkManager> manager;
+ shared_ptr<Shard> primary;
+
+ string prefix;
+ if (MONGO_unlikely(shouldLog(pc))) {
+ if (_totalTries > 0) {
+ prefix = str::stream() << "retrying (" << _totalTries << " tries)";
+ } else {
+ prefix = "creating";
}
- LOG( pc ) << prefix << " pcursor over " << _qSpec << " and " << _cInfo << endl;
+ }
+ LOG(pc) << prefix << " pcursor over " << _qSpec << " and " << _cInfo << endl;
- set<ShardId> shardIds;
- string vinfo;
+ set<ShardId> shardIds;
+ string vinfo;
- {
- shared_ptr<DBConfig> config;
+ {
+ shared_ptr<DBConfig> config;
- auto status = grid.catalogCache()->getDatabase(ns.db().toString());
- if (status.isOK()) {
- config = status.getValue();
- config->getChunkManagerOrPrimary(ns, manager, primary);
- }
+ auto status = grid.catalogCache()->getDatabase(ns.db().toString());
+ if (status.isOK()) {
+ config = status.getValue();
+ config->getChunkManagerOrPrimary(ns, manager, primary);
}
+ }
- if (manager) {
- if (MONGO_unlikely(shouldLog(pc))) {
- vinfo = str::stream() << "[" << manager->getns() << " @ "
- << manager->getVersion().toString() << "]";
- }
-
- manager->getShardIdsForQuery(shardIds,
- !_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter());
+ if (manager) {
+ if (MONGO_unlikely(shouldLog(pc))) {
+ vinfo = str::stream() << "[" << manager->getns() << " @ "
+ << manager->getVersion().toString() << "]";
}
- else if (primary) {
- if (MONGO_unlikely(shouldLog(pc))) {
- vinfo = str::stream() << "[unsharded @ " << primary->toString() << "]";
- }
- shardIds.insert(primary->getId());
+ manager->getShardIdsForQuery(shardIds,
+ !_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter());
+ } else if (primary) {
+ if (MONGO_unlikely(shouldLog(pc))) {
+ vinfo = str::stream() << "[unsharded @ " << primary->toString() << "]";
}
- // Close all cursors on extra shards first, as these will be invalid
- for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end();
- i != end;
- ++i) {
- if (shardIds.find(i->first) == shardIds.end()) {
- LOG( pc ) << "closing cursor on shard " << i->first
- << " as the connection is no longer required by " << vinfo << endl;
+ shardIds.insert(primary->getId());
+ }
- i->second.cleanup(true);
- }
+ // Close all cursors on extra shards first, as these will be invalid
+ for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end;
+ ++i) {
+ if (shardIds.find(i->first) == shardIds.end()) {
+ LOG(pc) << "closing cursor on shard " << i->first
+ << " as the connection is no longer required by " << vinfo << endl;
+
+ i->second.cleanup(true);
}
+ }
- LOG(pc) << "initializing over " << shardIds.size()
- << " shards required by " << vinfo;
+ LOG(pc) << "initializing over " << shardIds.size() << " shards required by " << vinfo;
- // Don't retry indefinitely for whatever reason
- _totalTries++;
- uassert( 15986, "too many retries in total", _totalTries < 10 );
+ // Don't retry indefinitely for whatever reason
+ _totalTries++;
+ uassert(15986, "too many retries in total", _totalTries < 10);
- for (const ShardId& shardId : shardIds) {
- PCMData& mdata = _cursorMap[shardId];
+ for (const ShardId& shardId : shardIds) {
+ PCMData& mdata = _cursorMap[shardId];
- LOG( pc ) << "initializing on shard " << shardId
- << ", current connection state is " << mdata.toBSON() << endl;
+ LOG(pc) << "initializing on shard " << shardId << ", current connection state is "
+ << mdata.toBSON() << endl;
- // This may be the first time connecting to this shard, if so we can get an error here
- try {
- if (mdata.initialized) {
- invariant(mdata.pcState);
-
- PCStatePtr state = mdata.pcState;
-
- bool compatiblePrimary = true;
- bool compatibleManager = true;
-
- if( primary && ! state->primary )
- warning() << "Collection becoming unsharded detected" << endl;
- if( manager && ! state->manager )
- warning() << "Collection becoming sharded detected" << endl;
- if( primary && state->primary && primary != state->primary )
- warning() << "Weird shift of primary detected" << endl;
-
- compatiblePrimary = primary && state->primary && primary == state->primary;
- compatibleManager = manager &&
- state->manager &&
- manager->compatibleWith(*state->manager, shardId);
-
- if( compatiblePrimary || compatibleManager ){
- // If we're compatible, don't need to retry unless forced
- if( ! mdata.retryNext ) continue;
- // Do partial cleanup
- mdata.cleanup( false );
- }
- else {
- // Force total cleanup of connection if no longer compatible
- mdata.cleanup( true );
- }
- }
- else {
- // Cleanup connection if we're not yet initialized
- mdata.cleanup( false );
- }
+ // This may be the first time connecting to this shard, if so we can get an error here
+ try {
+ if (mdata.initialized) {
+ invariant(mdata.pcState);
- mdata.pcState.reset( new PCState() );
PCStatePtr state = mdata.pcState;
- setupVersionAndHandleSlaveOk(state, shardId, primary, ns, vinfo, manager);
-
- const string& ns = _qSpec.ns();
-
- // Setup cursor
- if( ! state->cursor ){
-
- //
- // Here we decide whether to split the query limits up for multiple shards.
- // NOTE: There's a subtle issue here, in that it's possible we target a single
- // shard first, but are stale, and then target multiple shards, or vice-versa.
- // In both these cases, we won't re-use the old cursor created here, since the
- // shard version must have changed on the single shard between queries.
- //
-
- if (shardIds.size() > 1) {
-
- // Query limits split for multiple shards
-
- state->cursor.reset( new DBClientCursor( state->conn->get(), ns, _qSpec.query(),
- isCommand() ? 1 : 0, // nToReturn (0 if query indicates multi)
- 0, // nToSkip
- // Does this need to be a ptr?
- _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn
- _qSpec.options(), // options
- // NtoReturn is weird.
- // If zero, it means use default size, so we do that for all cursors
- // If positive, it's the batch size (we don't want this cursor limiting results), that's
- // done at a higher level
- // If negative, it's the batch size, but we don't create a cursor - so we don't want
- // to create a child cursor either.
- // Either way, if non-zero, we want to pull back the batch size + the skip amount as
- // quickly as possible. Potentially, for a cursor on a single shard or if we keep better track of
- // chunks, we can actually add the skip value into the cursor and/or make some assumptions about the
- // return value size ( (batch size + skip amount) / num_servers ).
- _qSpec.ntoreturn() == 0 ? 0 :
- ( _qSpec.ntoreturn() > 0 ? _qSpec.ntoreturn() + _qSpec.ntoskip() :
- _qSpec.ntoreturn() - _qSpec.ntoskip() ) ) ); // batchSize
- }
- else {
+ bool compatiblePrimary = true;
+ bool compatibleManager = true;
- // Single shard query
+ if (primary && !state->primary)
+ warning() << "Collection becoming unsharded detected" << endl;
+ if (manager && !state->manager)
+ warning() << "Collection becoming sharded detected" << endl;
+ if (primary && state->primary && primary != state->primary)
+ warning() << "Weird shift of primary detected" << endl;
- state->cursor.reset( new DBClientCursor( state->conn->get(), ns, _qSpec.query(),
- _qSpec.ntoreturn(), // nToReturn
- _qSpec.ntoskip(), // nToSkip
- // Does this need to be a ptr?
- _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn
- _qSpec.options(), // options
- 0 ) ); // batchSize
- }
+ compatiblePrimary = primary && state->primary && primary == state->primary;
+ compatibleManager =
+ manager && state->manager && manager->compatibleWith(*state->manager, shardId);
+
+ if (compatiblePrimary || compatibleManager) {
+ // If we're compatible, don't need to retry unless forced
+ if (!mdata.retryNext)
+ continue;
+ // Do partial cleanup
+ mdata.cleanup(false);
+ } else {
+ // Force total cleanup of connection if no longer compatible
+ mdata.cleanup(true);
}
+ } else {
+ // Cleanup connection if we're not yet initialized
+ mdata.cleanup(false);
+ }
- bool lazyInit = state->conn->get()->lazySupported();
- if( lazyInit ){
+ mdata.pcState.reset(new PCState());
+ PCStatePtr state = mdata.pcState;
- // Need to keep track if this is a second or third try for replica sets
- state->cursor->initLazy( mdata.retryNext );
- mdata.retryNext = false;
- mdata.initialized = true;
+ setupVersionAndHandleSlaveOk(state, shardId, primary, ns, vinfo, manager);
+
+ const string& ns = _qSpec.ns();
+
+ // Setup cursor
+ if (!state->cursor) {
+ //
+ // Here we decide whether to split the query limits up for multiple shards.
+ // NOTE: There's a subtle issue here, in that it's possible we target a single
+ // shard first, but are stale, and then target multiple shards, or vice-versa.
+ // In both these cases, we won't re-use the old cursor created here, since the
+ // shard version must have changed on the single shard between queries.
+ //
+
+ if (shardIds.size() > 1) {
+ // Query limits split for multiple shards
+
+ state->cursor.reset(new DBClientCursor(
+ state->conn->get(),
+ ns,
+ _qSpec.query(),
+ isCommand() ? 1 : 0, // nToReturn (0 if query indicates multi)
+ 0, // nToSkip
+ // Does this need to be a ptr?
+ _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn
+ _qSpec.options(), // options
+ // NtoReturn is weird.
+ // If zero, it means use default size, so we do that for all cursors
+ // If positive, it's the batch size (we don't want this cursor limiting results), that's
+ // done at a higher level
+ // If negative, it's the batch size, but we don't create a cursor - so we don't want
+ // to create a child cursor either.
+ // Either way, if non-zero, we want to pull back the batch size + the skip amount as
+ // quickly as possible. Potentially, for a cursor on a single shard or if we keep better track of
+ // chunks, we can actually add the skip value into the cursor and/or make some assumptions about the
+ // return value size ( (batch size + skip amount) / num_servers ).
+ _qSpec.ntoreturn() == 0 ? 0 : (_qSpec.ntoreturn() > 0
+ ? _qSpec.ntoreturn() + _qSpec.ntoskip()
+ : _qSpec.ntoreturn() -
+ _qSpec.ntoskip()))); // batchSize
+ } else {
+ // Single shard query
+
+ state->cursor.reset(new DBClientCursor(
+ state->conn->get(),
+ ns,
+ _qSpec.query(),
+ _qSpec.ntoreturn(), // nToReturn
+ _qSpec.ntoskip(), // nToSkip
+ // Does this need to be a ptr?
+ _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn
+ _qSpec.options(), // options
+ 0)); // batchSize
}
- else {
- bool success = false;
-
- if( nsGetCollection( ns ) == "$cmd" ){
- /* TODO: remove this when config servers don't use
- * SyncClusterConnection anymore. This is needed
- * because SyncConn doesn't allow the call() method
- * to be used for commands.
- */
- success = state->cursor->initCommand();
- }
- else {
- success = state->cursor->init();
- }
-
- // Without full initialization, throw an exception
- uassert( 15987, str::stream() << "could not fully initialize cursor on shard "
- << shardId << ", current connection state is "
- << mdata.toBSON().toString(), success );
+ }
- mdata.retryNext = false;
- mdata.initialized = true;
- mdata.finished = true;
+ bool lazyInit = state->conn->get()->lazySupported();
+ if (lazyInit) {
+ // Need to keep track if this is a second or third try for replica sets
+ state->cursor->initLazy(mdata.retryNext);
+ mdata.retryNext = false;
+ mdata.initialized = true;
+ } else {
+ bool success = false;
+
+ if (nsGetCollection(ns) == "$cmd") {
+ /* TODO: remove this when config servers don't use
+ * SyncClusterConnection anymore. This is needed
+ * because SyncConn doesn't allow the call() method
+ * to be used for commands.
+ */
+ success = state->cursor->initCommand();
+ } else {
+ success = state->cursor->init();
}
+ // Without full initialization, throw an exception
+ uassert(15987,
+ str::stream() << "could not fully initialize cursor on shard " << shardId
+ << ", current connection state is "
+ << mdata.toBSON().toString(),
+ success);
- LOG( pc ) << "initialized " << ( isCommand() ? "command " : "query " )
- << ( lazyInit ? "(lazily) " : "(full) " ) << "on shard " << shardId
- << ", current connection state is " << mdata.toBSON() << endl;
+ mdata.retryNext = false;
+ mdata.initialized = true;
+ mdata.finished = true;
}
- catch( StaleConfigException& e ){
- // Our version isn't compatible with the current version anymore on at least one shard, need to retry immediately
- NamespaceString staleNS( e.getns() );
- // For legacy reasons, this may not be set in the exception :-(
- if( staleNS.size() == 0 ) staleNS = ns; // ns is the *versioned* namespace, be careful of this
+ LOG(pc) << "initialized " << (isCommand() ? "command " : "query ")
+ << (lazyInit ? "(lazily) " : "(full) ") << "on shard " << shardId
+ << ", current connection state is " << mdata.toBSON() << endl;
+ } catch (StaleConfigException& e) {
+ // Our version isn't compatible with the current version anymore on at least one shard, need to retry immediately
+ NamespaceString staleNS(e.getns());
- // Probably need to retry fully
- bool forceReload, fullReload;
- _markStaleNS( staleNS, e, forceReload, fullReload );
+ // For legacy reasons, this may not be set in the exception :-(
+ if (staleNS.size() == 0)
+ staleNS = ns; // ns is the *versioned* namespace, be careful of this
- int logLevel = fullReload ? 0 : 1;
- LOG( pc + logLevel ) << "stale config of ns "
- << staleNS << " during initialization, will retry with forced : "
- << forceReload << ", full : " << fullReload << causedBy( e ) << endl;
+ // Probably need to retry fully
+ bool forceReload, fullReload;
+ _markStaleNS(staleNS, e, forceReload, fullReload);
- // This is somewhat strange
- if( staleNS != ns )
- warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS << endl;
+ int logLevel = fullReload ? 0 : 1;
+ LOG(pc + logLevel) << "stale config of ns " << staleNS
+ << " during initialization, will retry with forced : " << forceReload
+ << ", full : " << fullReload << causedBy(e) << endl;
- _handleStaleNS( staleNS, forceReload, fullReload );
-
- // Restart with new chunk manager
- startInit();
- return;
- }
- catch( SocketException& e ){
- warning() << "socket exception when initializing on "
- << shardId << ", current connection state is "
- << mdata.toBSON() << causedBy( e );
- e._shard = shardId;
- mdata.errored = true;
- if( returnPartial ){
- mdata.cleanup( true );
- continue;
- }
- throw;
- }
- catch( DBException& e ){
- warning() << "db exception when initializing on "
- << shardId << ", current connection state is "
- << mdata.toBSON() << causedBy( e );
- e._shard = shardId;
- mdata.errored = true;
- if( returnPartial && e.getCode() == 15925 /* From above! */ ){
- mdata.cleanup( true );
- continue;
- }
- throw;
- }
- catch( std::exception& e){
- warning() << "exception when initializing on "
- << shardId << ", current connection state is "
- << mdata.toBSON() << causedBy( e );
- mdata.errored = true;
- throw;
- }
- catch( ... ){
- warning() << "unknown exception when initializing on "
- << shardId << ", current connection state is " << mdata.toBSON();
- mdata.errored = true;
- throw;
- }
- }
+ // This is somewhat strange
+ if (staleNS != ns)
+ warning() << "versioned ns " << ns << " doesn't match stale config namespace "
+ << staleNS << endl;
- // Sanity check final init'ed connections
- for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end();
- i != end;
- ++i) {
- const ShardId& shardId = i->first;
- PCMData& mdata = i->second;
+ _handleStaleNS(staleNS, forceReload, fullReload);
- if (!mdata.pcState) {
+ // Restart with new chunk manager
+ startInit();
+ return;
+ } catch (SocketException& e) {
+ warning() << "socket exception when initializing on " << shardId
+ << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ e._shard = shardId;
+ mdata.errored = true;
+ if (returnPartial) {
+ mdata.cleanup(true);
continue;
}
-
- // Make sure all state is in shards
- invariant(shardIds.find(shardId) != shardIds.end());
- invariant(mdata.initialized == true);
-
- if (!mdata.completed) {
- invariant(mdata.pcState->conn->ok());
- }
-
- invariant(mdata.pcState->cursor);
- invariant(mdata.pcState->primary || mdata.pcState->manager);
- invariant(!mdata.retryNext);
-
- if (mdata.completed) {
- invariant(mdata.finished);
- }
-
- if (mdata.finished) {
- invariant(mdata.initialized);
- }
-
- if (!returnPartial) {
- invariant(mdata.initialized);
+ throw;
+ } catch (DBException& e) {
+ warning() << "db exception when initializing on " << shardId
+ << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ e._shard = shardId;
+ mdata.errored = true;
+ if (returnPartial && e.getCode() == 15925 /* From above! */) {
+ mdata.cleanup(true);
+ continue;
}
+ throw;
+ } catch (std::exception& e) {
+ warning() << "exception when initializing on " << shardId
+ << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ mdata.errored = true;
+ throw;
+ } catch (...) {
+ warning() << "unknown exception when initializing on " << shardId
+ << ", current connection state is " << mdata.toBSON();
+ mdata.errored = true;
+ throw;
}
-
}
- void ParallelSortClusteredCursor::finishInit(){
+ // Sanity check final init'ed connections
+ for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end;
+ ++i) {
+ const ShardId& shardId = i->first;
+ PCMData& mdata = i->second;
- bool returnPartial = ( _qSpec.options() & QueryOption_PartialResults );
- bool specialVersion = _cInfo.versionedNS.size() > 0;
- string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns();
+ if (!mdata.pcState) {
+ continue;
+ }
- bool retry = false;
- map< string, StaleConfigException > staleNSExceptions;
+ // Make sure all state is in shards
+ invariant(shardIds.find(shardId) != shardIds.end());
+ invariant(mdata.initialized == true);
- LOG( pc ) << "finishing over " << _cursorMap.size() << " shards" << endl;
+ if (!mdata.completed) {
+ invariant(mdata.pcState->conn->ok());
+ }
- for (map<ShardId, PCMData >::iterator i = _cursorMap.begin(), end = _cursorMap.end();
- i != end;
- ++i){
+ invariant(mdata.pcState->cursor);
+ invariant(mdata.pcState->primary || mdata.pcState->manager);
+ invariant(!mdata.retryNext);
- const ShardId& shardId = i->first;
- PCMData& mdata = i->second;
+ if (mdata.completed) {
+ invariant(mdata.finished);
+ }
- LOG( pc ) << "finishing on shard " << shardId
- << ", current connection state is " << mdata.toBSON() << endl;
+ if (mdata.finished) {
+ invariant(mdata.initialized);
+ }
- // Ignore empty conns for now
- if( ! mdata.pcState ) continue;
+ if (!returnPartial) {
+ invariant(mdata.initialized);
+ }
+ }
+}
- PCStatePtr state = mdata.pcState;
+void ParallelSortClusteredCursor::finishInit() {
+ bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
+ bool specialVersion = _cInfo.versionedNS.size() > 0;
+ string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns();
- try {
+ bool retry = false;
+ map<string, StaleConfigException> staleNSExceptions;
- // Sanity checks
- if( ! mdata.completed ) verify( state->conn && state->conn->ok() );
- verify( state->cursor );
- verify( state->manager || state->primary );
- verify( ! state->manager || ! state->primary );
+ LOG(pc) << "finishing over " << _cursorMap.size() << " shards" << endl;
+ for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end;
+ ++i) {
+ const ShardId& shardId = i->first;
+ PCMData& mdata = i->second;
- // If we weren't init'ing lazily, ignore this
- if( ! mdata.finished ){
+ LOG(pc) << "finishing on shard " << shardId << ", current connection state is "
+ << mdata.toBSON() << endl;
- mdata.finished = true;
+ // Ignore empty conns for now
+ if (!mdata.pcState)
+ continue;
- // Mark the cursor as non-retry by default
- mdata.retryNext = false;
+ PCStatePtr state = mdata.pcState;
- if( ! state->cursor->initLazyFinish( mdata.retryNext ) ){
- if( ! mdata.retryNext ){
- uassert( 15988, "error querying server", false );
- }
- else{
- retry = true;
- continue;
- }
+ try {
+ // Sanity checks
+ if (!mdata.completed)
+ verify(state->conn && state->conn->ok());
+ verify(state->cursor);
+ verify(state->manager || state->primary);
+ verify(!state->manager || !state->primary);
+
+
+ // If we weren't init'ing lazily, ignore this
+ if (!mdata.finished) {
+ mdata.finished = true;
+
+ // Mark the cursor as non-retry by default
+ mdata.retryNext = false;
+
+ if (!state->cursor->initLazyFinish(mdata.retryNext)) {
+ if (!mdata.retryNext) {
+ uassert(15988, "error querying server", false);
+ } else {
+ retry = true;
+ continue;
}
-
- mdata.completed = false;
}
- if( ! mdata.completed ){
+ mdata.completed = false;
+ }
- mdata.completed = true;
+ if (!mdata.completed) {
+ mdata.completed = true;
- // Make sure we didn't get an error we should rethrow
- // TODO : Refactor this to something better
- throwCursorStale( state->cursor.get() );
- throwCursorError( state->cursor.get() );
+ // Make sure we didn't get an error we should rethrow
+ // TODO : Refactor this to something better
+ throwCursorStale(state->cursor.get());
+ throwCursorError(state->cursor.get());
- // Finalize state
- state->cursor->attach( state->conn.get() ); // Closes connection for us
+ // Finalize state
+ state->cursor->attach(state->conn.get()); // Closes connection for us
- LOG( pc ) << "finished on shard " << shardId
- << ", current connection state is " << mdata.toBSON() << endl;
- }
+ LOG(pc) << "finished on shard " << shardId << ", current connection state is "
+ << mdata.toBSON() << endl;
}
- catch( RecvStaleConfigException& e ){
- retry = true;
-
- string staleNS = e.getns();
- // For legacy reasons, ns may not always be set in exception :-(
- if( staleNS.size() == 0 ) staleNS = ns; // ns is versioned namespace, be careful of this
-
- // Will retry all at once
- staleNSExceptions[ staleNS ] = e;
-
- // Fully clear this cursor, as it needs to be re-established
- mdata.cleanup( true );
+ } catch (RecvStaleConfigException& e) {
+ retry = true;
+
+ string staleNS = e.getns();
+ // For legacy reasons, ns may not always be set in exception :-(
+ if (staleNS.size() == 0)
+ staleNS = ns; // ns is versioned namespace, be careful of this
+
+ // Will retry all at once
+ staleNSExceptions[staleNS] = e;
+
+ // Fully clear this cursor, as it needs to be re-established
+ mdata.cleanup(true);
+ continue;
+ } catch (SocketException& e) {
+ warning() << "socket exception when finishing on " << shardId
+ << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ mdata.errored = true;
+ if (returnPartial) {
+ mdata.cleanup(true);
continue;
}
- catch( SocketException& e ){
- warning() << "socket exception when finishing on " << shardId
+ throw;
+ } catch (DBException& e) {
+ // NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM
+ // ABOVE
+ if (e.getCode() == 15988) {
+ warning() << "exception when receiving data from " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(e);
+
mdata.errored = true;
- if( returnPartial ){
- mdata.cleanup( true );
+ if (returnPartial) {
+ mdata.cleanup(true);
continue;
}
throw;
- }
- catch( DBException& e ){
- // NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM
- // ABOVE
- if (e.getCode() == 15988) {
-
- warning() << "exception when receiving data from " << shardId
- << ", current connection state is " << mdata.toBSON()
- << causedBy(e);
-
- mdata.errored = true;
- if (returnPartial) {
- mdata.cleanup( true );
- continue;
- }
- throw;
- }
- else {
- warning() << "db exception when finishing on " << shardId
- << ", current connection state is " << mdata.toBSON() << causedBy(e);
- mdata.errored = true;
- throw;
- }
- }
- catch( std::exception& e){
- warning() << "exception when finishing on " << shardId
- << ", current connection state is "
- << mdata.toBSON() << causedBy(e);
- mdata.errored = true;
- throw;
- }
- catch( ... ){
- warning() << "unknown exception when finishing on " << shardId
- << ", current connection state is " << mdata.toBSON();
+ } else {
+ warning() << "db exception when finishing on " << shardId
+ << ", current connection state is " << mdata.toBSON() << causedBy(e);
mdata.errored = true;
throw;
}
-
+ } catch (std::exception& e) {
+ warning() << "exception when finishing on " << shardId
+ << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ mdata.errored = true;
+ throw;
+ } catch (...) {
+ warning() << "unknown exception when finishing on " << shardId
+ << ", current connection state is " << mdata.toBSON();
+ mdata.errored = true;
+ throw;
}
+ }
- // Retry logic for single refresh of namespaces / retry init'ing connections
- if( retry ){
-
- // Refresh stale namespaces
- if( staleNSExceptions.size() ){
- for( map<string,StaleConfigException>::iterator i = staleNSExceptions.begin(), end = staleNSExceptions.end(); i != end; ++i ){
-
- NamespaceString staleNS( i->first );
- const StaleConfigException& exception = i->second;
+ // Retry logic for single refresh of namespaces / retry init'ing connections
+ if (retry) {
+ // Refresh stale namespaces
+ if (staleNSExceptions.size()) {
+ for (map<string, StaleConfigException>::iterator i = staleNSExceptions.begin(),
+ end = staleNSExceptions.end();
+ i != end;
+ ++i) {
+ NamespaceString staleNS(i->first);
+ const StaleConfigException& exception = i->second;
- bool forceReload, fullReload;
- _markStaleNS( staleNS, exception, forceReload, fullReload );
+ bool forceReload, fullReload;
+ _markStaleNS(staleNS, exception, forceReload, fullReload);
- int logLevel = fullReload ? 0 : 1;
- LOG( pc + logLevel ) << "stale config of ns "
- << staleNS << " on finishing query, will retry with forced : "
- << forceReload << ", full : " << fullReload << causedBy( exception ) << endl;
+ int logLevel = fullReload ? 0 : 1;
+ LOG(pc + logLevel)
+ << "stale config of ns " << staleNS
+ << " on finishing query, will retry with forced : " << forceReload
+ << ", full : " << fullReload << causedBy(exception) << endl;
- // This is somewhat strange
- if( staleNS != ns )
- warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS << endl;
+ // This is somewhat strange
+ if (staleNS != ns)
+ warning() << "versioned ns " << ns << " doesn't match stale config namespace "
+ << staleNS << endl;
- _handleStaleNS( staleNS, forceReload, fullReload );
- }
+ _handleStaleNS(staleNS, forceReload, fullReload);
}
-
- // Re-establish connections we need to
- startInit();
- finishInit();
- return;
}
- // Sanity check and clean final connections
- map<ShardId, PCMData >::iterator i = _cursorMap.begin();
- while( i != _cursorMap.end() ){
-
- PCMData& mdata = i->second;
-
- // Erase empty stuff
- if( ! mdata.pcState ){
- log() << "PCursor erasing empty state " << mdata.toBSON() << endl;
- _cursorMap.erase( i++ );
- continue;
- }
- else ++i;
-
- // Make sure all state is in shards
- verify( mdata.initialized == true );
- verify( mdata.finished == true );
- verify( mdata.completed == true );
- verify( ! mdata.pcState->conn->ok() );
- verify( mdata.pcState->cursor );
- verify( mdata.pcState->primary || mdata.pcState->manager );
- }
+ // Re-establish connections we need to
+ startInit();
+ finishInit();
+ return;
+ }
- // TODO : More cleanup of metadata?
+ // Sanity check and clean final connections
+ map<ShardId, PCMData>::iterator i = _cursorMap.begin();
+ while (i != _cursorMap.end()) {
+ PCMData& mdata = i->second;
+
+ // Erase empty stuff
+ if (!mdata.pcState) {
+ log() << "PCursor erasing empty state " << mdata.toBSON() << endl;
+ _cursorMap.erase(i++);
+ continue;
+ } else
+ ++i;
+
+ // Make sure all state is in shards
+ verify(mdata.initialized == true);
+ verify(mdata.finished == true);
+ verify(mdata.completed == true);
+ verify(!mdata.pcState->conn->ok());
+ verify(mdata.pcState->cursor);
+ verify(mdata.pcState->primary || mdata.pcState->manager);
+ }
- // LEGACY STUFF NOW
+ // TODO : More cleanup of metadata?
- _cursors = new DBClientCursorHolder[ _cursorMap.size() ];
+ // LEGACY STUFF NOW
- // Put the cursors in the legacy format
- int index = 0;
- for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end();
- i != end;
- ++i) {
- PCMData& mdata = i->second;
+ _cursors = new DBClientCursorHolder[_cursorMap.size()];
- _cursors[ index ].reset( mdata.pcState->cursor.get(), &mdata );
+ // Put the cursors in the legacy format
+ int index = 0;
+ for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end;
+ ++i) {
+ PCMData& mdata = i->second;
- {
- const auto shard = grid.shardRegistry()->getShard(i->first);
- _servers.insert(shard->getConnString().toString());
- }
+ _cursors[index].reset(mdata.pcState->cursor.get(), &mdata);
- index++;
+ {
+ const auto shard = grid.shardRegistry()->getShard(i->first);
+ _servers.insert(shard->getConnString().toString());
}
- _numServers = _cursorMap.size();
-
+ index++;
}
- bool ParallelSortClusteredCursor::isSharded() {
- // LEGACY is always unsharded
- if( _qSpec.isEmpty() ) return false;
- // We're always sharded if the number of cursors != 1
- // TODO: Kept this way for compatibility with getPrimary(), but revisit
- if( _cursorMap.size() != 1 ) return true;
- // Return if the single cursor is sharded
- return NULL != _cursorMap.begin()->second.pcState->manager;
- }
+ _numServers = _cursorMap.size();
+}
- int ParallelSortClusteredCursor::getNumQueryShards() {
- return _cursorMap.size();
- }
+bool ParallelSortClusteredCursor::isSharded() {
+ // LEGACY is always unsharded
+ if (_qSpec.isEmpty())
+ return false;
+ // We're always sharded if the number of cursors != 1
+ // TODO: Kept this way for compatibility with getPrimary(), but revisit
+ if (_cursorMap.size() != 1)
+ return true;
+ // Return if the single cursor is sharded
+ return NULL != _cursorMap.begin()->second.pcState->manager;
+}
- std::shared_ptr<Shard> ParallelSortClusteredCursor::getQueryShard() {
- return grid.shardRegistry()->getShard(_cursorMap.begin()->first);
- }
+int ParallelSortClusteredCursor::getNumQueryShards() {
+ return _cursorMap.size();
+}
- void ParallelSortClusteredCursor::getQueryShardIds(set<ShardId>& shardIds) {
- for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end();
- i != end;
- ++i) {
- shardIds.insert(i->first);
- }
- }
+std::shared_ptr<Shard> ParallelSortClusteredCursor::getQueryShard() {
+ return grid.shardRegistry()->getShard(_cursorMap.begin()->first);
+}
- std::shared_ptr<Shard> ParallelSortClusteredCursor::getPrimary() {
- if (isSharded())
- return std::shared_ptr<Shard>();
- return _cursorMap.begin()->second.pcState->primary;
+void ParallelSortClusteredCursor::getQueryShardIds(set<ShardId>& shardIds) {
+ for (map<ShardId, PCMData>::iterator i = _cursorMap.begin(), end = _cursorMap.end(); i != end;
+ ++i) {
+ shardIds.insert(i->first);
}
+}
- DBClientCursorPtr ParallelSortClusteredCursor::getShardCursor(const ShardId& shardId) {
- map<ShardId,PCMData>::iterator i = _cursorMap.find(shardId);
-
- if( i == _cursorMap.end() ) return DBClientCursorPtr();
- else return i->second.pcState->cursor;
- }
+std::shared_ptr<Shard> ParallelSortClusteredCursor::getPrimary() {
+ if (isSharded())
+ return std::shared_ptr<Shard>();
+ return _cursorMap.begin()->second.pcState->primary;
+}
- // DEPRECATED (but still used by map/reduce)
- void ParallelSortClusteredCursor::_oldInit() {
- // make sure we're not already initialized
- verify( ! _cursors );
- _cursors = new DBClientCursorHolder[_numServers];
+DBClientCursorPtr ParallelSortClusteredCursor::getShardCursor(const ShardId& shardId) {
+ map<ShardId, PCMData>::iterator i = _cursorMap.find(shardId);
- bool returnPartial = ( _options & QueryOption_PartialResults );
+ if (i == _cursorMap.end())
+ return DBClientCursorPtr();
+ else
+ return i->second.pcState->cursor;
+}
- vector<string> serverHosts(_servers.begin(), _servers.end());
- set<int> retryQueries;
- int finishedQueries = 0;
+// DEPRECATED (but still used by map/reduce)
+void ParallelSortClusteredCursor::_oldInit() {
+ // make sure we're not already initialized
+ verify(!_cursors);
+ _cursors = new DBClientCursorHolder[_numServers];
- vector< shared_ptr<ShardConnection> > conns;
- vector<string> servers;
+ bool returnPartial = (_options & QueryOption_PartialResults);
- // Since we may get all sorts of errors, record them all as they come and throw them later if necessary
- vector<string> staleConfigExs;
- vector<string> socketExs;
- vector<string> otherExs;
- bool allConfigStale = false;
+ vector<string> serverHosts(_servers.begin(), _servers.end());
+ set<int> retryQueries;
+ int finishedQueries = 0;
- int retries = -1;
+ vector<shared_ptr<ShardConnection>> conns;
+ vector<string> servers;
- // Loop through all the queries until we've finished or gotten a socket exception on all of them
- // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results
- do {
- retries++;
+ // Since we may get all sorts of errors, record them all as they come and throw them later if necessary
+ vector<string> staleConfigExs;
+ vector<string> socketExs;
+ vector<string> otherExs;
+ bool allConfigStale = false;
- bool firstPass = retryQueries.size() == 0;
+ int retries = -1;
- if( ! firstPass ){
- log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to ";
- for (set<int>::const_iterator it = retryQueries.begin();
- it != retryQueries.end();
- ++it) {
+ // Loop through all the queries until we've finished or gotten a socket exception on all of them
+ // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results
+ do {
+ retries++;
- log() << serverHosts[*it] << ", ";
- }
- log() << finishedQueries << " finished queries." << endl;
- }
+ bool firstPass = retryQueries.size() == 0;
- size_t num = 0;
- for (vector<string>::const_iterator it = serverHosts.begin();
- it != serverHosts.end();
+ if (!firstPass) {
+ log() << "retrying " << (returnPartial ? "(partial) " : "")
+ << "parallel connection to ";
+ for (set<int>::const_iterator it = retryQueries.begin(); it != retryQueries.end();
++it) {
+ log() << serverHosts[*it] << ", ";
+ }
+ log() << finishedQueries << " finished queries." << endl;
+ }
- size_t i = num++;
-
- const string& serverHost = *it;
-
- // If we're not retrying this cursor on later passes, continue
- if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue;
+ size_t num = 0;
+ for (vector<string>::const_iterator it = serverHosts.begin(); it != serverHosts.end();
+ ++it) {
+ size_t i = num++;
- const string errLoc = " @ " + serverHost;
+ const string& serverHost = *it;
- if( firstPass ){
+ // If we're not retrying this cursor on later passes, continue
+ if (!firstPass && retryQueries.find(i) == retryQueries.end())
+ continue;
- // This may be the first time connecting to this shard, if so we can get an error here
- try {
- conns.push_back(
- shared_ptr<ShardConnection>(
- new ShardConnection(uassertStatusOK(
- ConnectionString::parse(serverHost)),
- _ns)));
- }
- catch( std::exception& e ) {
- socketExs.push_back( e.what() + errLoc );
- if( ! returnPartial ){
- num--;
- break;
- }
+ const string errLoc = " @ " + serverHost;
- conns.push_back( shared_ptr<ShardConnection>() );
- continue;
+ if (firstPass) {
+ // This may be the first time connecting to this shard, if so we can get an error here
+ try {
+ conns.push_back(shared_ptr<ShardConnection>(new ShardConnection(
+ uassertStatusOK(ConnectionString::parse(serverHost)), _ns)));
+ } catch (std::exception& e) {
+ socketExs.push_back(e.what() + errLoc);
+ if (!returnPartial) {
+ num--;
+ break;
}
- servers.push_back(serverHost);
- }
-
- if ( conns[i]->setVersion() ) {
- conns[i]->done();
-
- // Version is zero b/c this is deprecated codepath
- staleConfigExs.push_back(str::stream()
- << "stale config detected for "
- << RecvStaleConfigException(_ns,
- "ParallelCursor::_init",
- ChunkVersion(0, 0, OID()),
- ChunkVersion( 0, 0, OID())).what()
- << errLoc);
- break;
- }
-
- LOG(5) << "ParallelSortClusteredCursor::init server:" << serverHost
- << " ns:" << _ns << " query:" << _query << " fields:" << _fields
- << " options: " << _options;
-
- if( ! _cursors[i].get() )
- _cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , _query,
- 0 , // nToReturn
- 0 , // nToSkip
- _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn
- _options ,
- _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
- ), NULL );
-
- try{
- _cursors[i].get()->initLazy( ! firstPass );
- }
- catch( SocketException& e ){
- socketExs.push_back( e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
- if( ! returnPartial ) break;
- }
- catch( std::exception& e){
- otherExs.push_back( e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
- break;
+ conns.push_back(shared_ptr<ShardConnection>());
+ continue;
}
+ servers.push_back(serverHost);
}
- // Go through all the potentially started cursors and finish initializing them or log any errors and
- // potentially retry
- // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways
- // here that we need to trap.
- for ( size_t i = 0; i < num; i++ ) {
- const string errLoc = " @ " + serverHosts[i];
+ if (conns[i]->setVersion()) {
+ conns[i]->done();
- if( ! _cursors[i].get() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){
- if( conns[i] ) conns[i].get()->done();
- continue;
- }
+ // Version is zero b/c this is deprecated codepath
+ staleConfigExs.push_back(
+ str::stream() << "stale config detected for "
+ << RecvStaleConfigException(_ns,
+ "ParallelCursor::_init",
+ ChunkVersion(0, 0, OID()),
+ ChunkVersion(0, 0, OID())).what()
+ << errLoc);
+ break;
+ }
- verify( conns[i] );
- retryQueries.erase( i );
+ LOG(5) << "ParallelSortClusteredCursor::init server:" << serverHost << " ns:" << _ns
+ << " query:" << _query << " fields:" << _fields << " options: " << _options;
+
+ if (!_cursors[i].get())
+ _cursors[i].reset(
+ new DBClientCursor(conns[i]->get(),
+ _ns,
+ _query,
+ 0, // nToReturn
+ 0, // nToSkip
+ _fields.isEmpty() ? 0 : &_fields, // fieldsToReturn
+ _options,
+ _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
+ ),
+ NULL);
- bool retry = false;
+ try {
+ _cursors[i].get()->initLazy(!firstPass);
+ } catch (SocketException& e) {
+ socketExs.push_back(e.what() + errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ if (!returnPartial)
+ break;
+ } catch (std::exception& e) {
+ otherExs.push_back(e.what() + errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ break;
+ }
+ }
- try {
+ // Go through all the potentially started cursors and finish initializing them or log any errors and
+ // potentially retry
+ // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways
+ // here that we need to trap.
+ for (size_t i = 0; i < num; i++) {
+ const string errLoc = " @ " + serverHosts[i];
- if( ! _cursors[i].get()->initLazyFinish( retry ) ) {
+ if (!_cursors[i].get() || (!firstPass && retryQueries.find(i) == retryQueries.end())) {
+ if (conns[i])
+ conns[i].get()->done();
+ continue;
+ }
- warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl;
- _cursors[i].reset( NULL, NULL );
+ verify(conns[i]);
+ retryQueries.erase(i);
- if( ! retry ){
- socketExs.push_back( str::stream() << "error querying server: " << servers[i] );
- conns[i]->done();
- }
- else {
- retryQueries.insert( i );
- }
+ bool retry = false;
- continue;
+ try {
+ if (!_cursors[i].get()->initLazyFinish(retry)) {
+ warning() << "invalid result from " << conns[i]->getHost()
+ << (retry ? ", retrying" : "") << endl;
+ _cursors[i].reset(NULL, NULL);
+
+ if (!retry) {
+ socketExs.push_back(str::stream()
+ << "error querying server: " << servers[i]);
+ conns[i]->done();
+ } else {
+ retryQueries.insert(i);
}
- }
- catch ( StaleConfigException& e ){
- // Our stored configuration data is actually stale, we need to reload it
- // when we throw our exception
- allConfigStale = true;
-
- staleConfigExs.push_back( (string)"stale config detected when receiving response for " + e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
- continue;
- }
- catch ( SocketException& e ) {
- socketExs.push_back( e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
- continue;
- }
- catch( std::exception& e ){
- otherExs.push_back( e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
- continue;
- }
-
- try {
- _cursors[i].get()->attach( conns[i].get() ); // this calls done on conn
- // Rethrow stale config or other errors
- throwCursorStale( _cursors[i].get() );
- throwCursorError( _cursors[i].get() );
- finishedQueries++;
- }
- catch ( StaleConfigException& e ){
-
- // Our stored configuration data is actually stale, we need to reload it
- // when we throw our exception
- allConfigStale = true;
-
- staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
- continue;
- }
- catch( std::exception& e ){
- otherExs.push_back( e.what() + errLoc );
- _cursors[i].reset( NULL, NULL );
- conns[i]->done();
continue;
}
+ } catch (StaleConfigException& e) {
+ // Our stored configuration data is actually stale, we need to reload it
+ // when we throw our exception
+ allConfigStale = true;
+
+ staleConfigExs.push_back(
+ (string) "stale config detected when receiving response for " + e.what() +
+ errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ continue;
+ } catch (SocketException& e) {
+ socketExs.push_back(e.what() + errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ continue;
+ } catch (std::exception& e) {
+ otherExs.push_back(e.what() + errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ continue;
}
- // Don't exceed our max retries, should not happen
- verify( retries < 5 );
- }
- while( retryQueries.size() > 0 /* something to retry */ &&
- ( socketExs.size() == 0 || returnPartial ) /* no conn issues */ &&
- staleConfigExs.size() == 0 /* no config issues */ &&
- otherExs.size() == 0 /* no other issues */);
-
- // Assert that our conns are all closed!
- for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){
- verify( ! (*i) || ! (*i)->ok() );
- }
-
- // Handle errors we got during initialization.
- // If we're returning partial results, we can ignore socketExs, but nothing else
- // Log a warning in any case, so we don't lose these messages
- bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0;
-
- if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) {
-
- vector<string> errMsgs;
-
- errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() );
- errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() );
- errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() );
-
- stringstream errMsg;
- errMsg << "could not initialize cursor across all shards because : ";
- for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){
- if( i != errMsgs.begin() ) errMsg << " :: and :: ";
- errMsg << *i;
- }
-
- if (throwException && staleConfigExs.size() > 0) {
- // Version is zero b/c this is deprecated codepath
- throw RecvStaleConfigException(_ns,
- errMsg.str(),
- ChunkVersion( 0, 0, OID() ),
- ChunkVersion( 0, 0, OID() ));
- }
- else if (throwException) {
- throw DBException(errMsg.str(), 14827);
- }
- else {
- warning() << errMsg.str() << endl;
+ try {
+ _cursors[i].get()->attach(conns[i].get()); // this calls done on conn
+ // Rethrow stale config or other errors
+ throwCursorStale(_cursors[i].get());
+ throwCursorError(_cursors[i].get());
+
+ finishedQueries++;
+ } catch (StaleConfigException& e) {
+ // Our stored configuration data is actually stale, we need to reload it
+ // when we throw our exception
+ allConfigStale = true;
+
+ staleConfigExs.push_back((string) "stale config detected for " + e.what() + errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ continue;
+ } catch (std::exception& e) {
+ otherExs.push_back(e.what() + errLoc);
+ _cursors[i].reset(NULL, NULL);
+ conns[i]->done();
+ continue;
}
}
- if( retries > 0 )
- log() << "successfully finished parallel query after " << retries << " retries" << endl;
+ // Don't exceed our max retries, should not happen
+ verify(retries < 5);
+ } while (retryQueries.size() > 0 /* something to retry */ &&
+ (socketExs.size() == 0 || returnPartial) /* no conn issues */ &&
+ staleConfigExs.size() == 0 /* no config issues */ &&
+ otherExs.size() == 0 /* no other issues */);
+ // Assert that our conns are all closed!
+ for (vector<shared_ptr<ShardConnection>>::iterator i = conns.begin(); i < conns.end(); ++i) {
+ verify(!(*i) || !(*i)->ok());
}
- ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
-
- // WARNING: Commands (in particular M/R) connect via _oldInit() directly to shards
- bool isDirectShardCursor = _cursorMap.empty();
+ // Handle errors we got during initialization.
+ // If we're returning partial results, we can ignore socketExs, but nothing else
+ // Log a warning in any case, so we don't lose these messages
+ bool throwException = (socketExs.size() > 0 && !returnPartial) || staleConfigExs.size() > 0 ||
+ otherExs.size() > 0;
+
+ if (socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0) {
+ vector<string> errMsgs;
+
+ errMsgs.insert(errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end());
+ errMsgs.insert(errMsgs.end(), otherExs.begin(), otherExs.end());
+ errMsgs.insert(errMsgs.end(), socketExs.begin(), socketExs.end());
+
+ stringstream errMsg;
+ errMsg << "could not initialize cursor across all shards because : ";
+ for (vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++) {
+ if (i != errMsgs.begin())
+ errMsg << " :: and :: ";
+ errMsg << *i;
+ }
- // Non-direct shard cursors are owned by the _cursorMap, so we release
- // them in the array here. Direct shard cursors clean themselves.
- if (!isDirectShardCursor) {
- for( int i = 0; i < _numServers; i++ ) _cursors[i].release();
+ if (throwException && staleConfigExs.size() > 0) {
+ // Version is zero b/c this is deprecated codepath
+ throw RecvStaleConfigException(
+ _ns, errMsg.str(), ChunkVersion(0, 0, OID()), ChunkVersion(0, 0, OID()));
+ } else if (throwException) {
+ throw DBException(errMsg.str(), 14827);
+ } else {
+ warning() << errMsg.str() << endl;
}
+ }
- delete [] _cursors;
- _cursors = 0;
+ if (retries > 0)
+ log() << "successfully finished parallel query after " << retries << " retries" << endl;
+}
- // Clear out our metadata after removing legacy cursor data
- _cursorMap.clear();
+ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
+ // WARNING: Commands (in particular M/R) connect via _oldInit() directly to shards
+ bool isDirectShardCursor = _cursorMap.empty();
- // Just to be sure
- _done = true;
+ // Non-direct shard cursors are owned by the _cursorMap, so we release
+ // them in the array here. Direct shard cursors clean themselves.
+ if (!isDirectShardCursor) {
+ for (int i = 0; i < _numServers; i++)
+ _cursors[i].release();
}
- void ParallelSortClusteredCursor::setBatchSize(int newBatchSize) {
- for ( int i=0; i<_numServers; i++ ) {
- if (_cursors[i].get())
- _cursors[i].get()->setBatchSize(newBatchSize);
- }
- }
+ delete[] _cursors;
+ _cursors = 0;
- bool ParallelSortClusteredCursor::more() {
+ // Clear out our metadata after removing legacy cursor data
+ _cursorMap.clear();
- if ( _needToSkip > 0 ) {
- int n = _needToSkip;
- _needToSkip = 0;
+ // Just to be sure
+ _done = true;
+}
- while ( n > 0 && more() ) {
- next();
- n--;
- }
+void ParallelSortClusteredCursor::setBatchSize(int newBatchSize) {
+ for (int i = 0; i < _numServers; i++) {
+ if (_cursors[i].get())
+ _cursors[i].get()->setBatchSize(newBatchSize);
+ }
+}
- _needToSkip = n;
- }
+bool ParallelSortClusteredCursor::more() {
+ if (_needToSkip > 0) {
+ int n = _needToSkip;
+ _needToSkip = 0;
- for ( int i=0; i<_numServers; i++ ) {
- if (_cursors[i].get() && _cursors[i].get()->more())
- return true;
+ while (n > 0 && more()) {
+ next();
+ n--;
}
- return false;
- }
- BSONObj ParallelSortClusteredCursor::next() {
- BSONObj best = BSONObj();
- int bestFrom = -1;
+ _needToSkip = n;
+ }
- for( int j = 0; j < _numServers; j++ ){
+ for (int i = 0; i < _numServers; i++) {
+ if (_cursors[i].get() && _cursors[i].get()->more())
+ return true;
+ }
+ return false;
+}
- // Iterate _numServers times, starting one past the last server we used.
- // This means we actually start at server #1, not #0, but shouldn't matter
+BSONObj ParallelSortClusteredCursor::next() {
+ BSONObj best = BSONObj();
+ int bestFrom = -1;
- int i = ( j + _lastFrom + 1 ) % _numServers;
+ for (int j = 0; j < _numServers; j++) {
+ // Iterate _numServers times, starting one past the last server we used.
+ // This means we actually start at server #1, not #0, but shouldn't matter
- // Check to see if the cursor is finished
- if (!_cursors[i].get() || !_cursors[i].get()->more()) {
- if (_cursors[i].getMData())
- _cursors[i].getMData()->pcState->done = true;
- continue;
- }
-
- // We know we have at least one result in this cursor
- BSONObj me = _cursors[i].get()->peekFirst();
+ int i = (j + _lastFrom + 1) % _numServers;
- // If this is the first non-empty cursor, save the result as best
- if (bestFrom < 0) {
- best = me;
- bestFrom = i;
- if( _sortKey.isEmpty() ) break;
- continue;
- }
+ // Check to see if the cursor is finished
+ if (!_cursors[i].get() || !_cursors[i].get()->more()) {
+ if (_cursors[i].getMData())
+ _cursors[i].getMData()->pcState->done = true;
+ continue;
+ }
- // Otherwise compare the result to the current best result
- int comp = best.woSortOrder( me , _sortKey , true );
- if ( comp < 0 )
- continue;
+ // We know we have at least one result in this cursor
+ BSONObj me = _cursors[i].get()->peekFirst();
+ // If this is the first non-empty cursor, save the result as best
+ if (bestFrom < 0) {
best = me;
bestFrom = i;
+ if (_sortKey.isEmpty())
+ break;
+ continue;
}
- _lastFrom = bestFrom;
+ // Otherwise compare the result to the current best result
+ int comp = best.woSortOrder(me, _sortKey, true);
+ if (comp < 0)
+ continue;
- uassert(10019, "no more elements", bestFrom >= 0);
- _cursors[bestFrom].get()->next();
+ best = me;
+ bestFrom = i;
+ }
- // Make sure the result data won't go away after the next call to more()
- if (!_cursors[bestFrom].get()->moreInCurrentBatch()) {
- best = best.getOwned();
- }
+ _lastFrom = bestFrom;
- if (_cursors[bestFrom].getMData())
- _cursors[bestFrom].getMData()->pcState->count++;
+ uassert(10019, "no more elements", bestFrom >= 0);
+ _cursors[bestFrom].get()->next();
- return best;
+ // Make sure the result data won't go away after the next call to more()
+ if (!_cursors[bestFrom].get()->moreInCurrentBatch()) {
+ best = best.getOwned();
}
- void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ) {
-
- set<ShardId> shardIds;
- getQueryShardIds(shardIds);
+ if (_cursors[bestFrom].getMData())
+ _cursors[bestFrom].getMData()->pcState->count++;
- for (const ShardId& shardId : shardIds) {
- list<BSONObj>& l = out[shardId];
- l.push_back(getShardCursor(shardId)->peekFirst().getOwned());
- }
+ return best;
+}
- }
+void ParallelSortClusteredCursor::_explain(map<string, list<BSONObj>>& out) {
+ set<ShardId> shardIds;
+ getQueryShardIds(shardIds);
- // -----------------
- // ---- Future -----
- // -----------------
-
- Future::CommandResult::CommandResult( const string& server,
- const string& db,
- const BSONObj& cmd,
- int options,
- DBClientBase * conn,
- bool useShardedConn ):
- _server(server),
- _db(db),
- _options(options),
- _cmd(cmd),
- _conn(conn),
- _useShardConn(useShardedConn),
- _done(false)
- {
- init();
+ for (const ShardId& shardId : shardIds) {
+ list<BSONObj>& l = out[shardId];
+ l.push_back(getShardCursor(shardId)->peekFirst().getOwned());
}
+}
- void Future::CommandResult::init(){
- try {
- if ( ! _conn ){
- if ( _useShardConn) {
- _connHolder.reset(
- new ShardConnection(uassertStatusOK(ConnectionString::parse(_server)),
- "",
- NULL));
- }
- else {
- _connHolder.reset( new ScopedDbConnection( _server ) );
- }
+// -----------------
+// ---- Future -----
+// -----------------
+
+Future::CommandResult::CommandResult(const string& server,
+ const string& db,
+ const BSONObj& cmd,
+ int options,
+ DBClientBase* conn,
+ bool useShardedConn)
+ : _server(server),
+ _db(db),
+ _options(options),
+ _cmd(cmd),
+ _conn(conn),
+ _useShardConn(useShardedConn),
+ _done(false) {
+ init();
+}
- _conn = _connHolder->get();
+void Future::CommandResult::init() {
+ try {
+ if (!_conn) {
+ if (_useShardConn) {
+ _connHolder.reset(new ShardConnection(
+ uassertStatusOK(ConnectionString::parse(_server)), "", NULL));
+ } else {
+ _connHolder.reset(new ScopedDbConnection(_server));
}
- if ( _conn->lazySupported() ) {
- _cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd,
- -1/*limit*/, 0, NULL, _options, 0));
- _cursor->initLazy();
- }
- else {
- _done = true; // we set _done first because even if there is an error we're done
- _ok = _conn->runCommand( _db , _cmd , _res , _options );
- }
+ _conn = _connHolder->get();
}
- catch ( std::exception& e ) {
- error() << "Future::spawnCommand (part 1) exception: " << e.what() << endl;
- _ok = false;
- _done = true;
+
+ if (_conn->lazySupported()) {
+ _cursor.reset(
+ new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1 /*limit*/, 0, NULL, _options, 0));
+ _cursor->initLazy();
+ } else {
+ _done = true; // we set _done first because even if there is an error we're done
+ _ok = _conn->runCommand(_db, _cmd, _res, _options);
}
+ } catch (std::exception& e) {
+ error() << "Future::spawnCommand (part 1) exception: " << e.what() << endl;
+ _ok = false;
+ _done = true;
}
+}
- bool Future::CommandResult::join( int maxRetries ) {
- if (_done)
- return _ok;
-
+bool Future::CommandResult::join(int maxRetries) {
+ if (_done)
+ return _ok;
- _ok = false;
- for( int i = 1; i <= maxRetries; i++ ){
- try {
- bool retry = false;
- bool finished = _cursor->initLazyFinish( retry );
+ _ok = false;
+ for (int i = 1; i <= maxRetries; i++) {
+ try {
+ bool retry = false;
+ bool finished = _cursor->initLazyFinish(retry);
- // Shouldn't need to communicate with server any more
- if ( _connHolder )
- _connHolder->done();
+ // Shouldn't need to communicate with server any more
+ if (_connHolder)
+ _connHolder->done();
- uassert(14812, str::stream() << "Error running command on server: " << _server, finished);
- massert(14813, "Command returned nothing", _cursor->more());
+ uassert(
+ 14812, str::stream() << "Error running command on server: " << _server, finished);
+ massert(14813, "Command returned nothing", _cursor->more());
- // Rethrow stale config errors stored in this cursor for correct handling
- throwCursorStale(_cursor.get());
+ // Rethrow stale config errors stored in this cursor for correct handling
+ throwCursorStale(_cursor.get());
- _res = _cursor->nextSafe();
- _ok = _res["ok"].trueValue();
+ _res = _cursor->nextSafe();
+ _ok = _res["ok"].trueValue();
- break;
- }
- catch ( RecvStaleConfigException& e ){
+ break;
+ } catch (RecvStaleConfigException& e) {
+ verify(versionManager.isVersionableCB(_conn));
- verify( versionManager.isVersionableCB( _conn ) );
+ // For legacy reasons, we may not always have a namespace :-(
+ string staleNS = e.getns();
+ if (staleNS.size() == 0)
+ staleNS = _db;
- // For legacy reasons, we may not always have a namespace :-(
- string staleNS = e.getns();
- if( staleNS.size() == 0 ) staleNS = _db;
+ if (i >= maxRetries) {
+ error() << "Future::spawnCommand (part 2) stale config exception" << causedBy(e)
+ << endl;
+ throw e;
+ }
- if( i >= maxRetries ){
- error() << "Future::spawnCommand (part 2) stale config exception" << causedBy( e ) << endl;
+ if (i >= maxRetries / 2) {
+ if (!versionManager.forceRemoteCheckShardVersionCB(staleNS)) {
+ error() << "Future::spawnCommand (part 2) no config detected" << causedBy(e)
+ << endl;
throw e;
}
-
- if( i >= maxRetries / 2 ){
- if( ! versionManager.forceRemoteCheckShardVersionCB( staleNS ) ){
- error() << "Future::spawnCommand (part 2) no config detected" << causedBy( e ) << endl;
- throw e;
- }
- }
-
- // We may not always have a collection, since we don't know from a generic command what collection
- // is supposed to be acted on, if any
- if( nsGetCollection( staleNS ).size() == 0 ){
- warning() << "no collection namespace in stale config exception "
- << "for lazy command " << _cmd << ", could not refresh "
- << staleNS << endl;
- }
- else {
- versionManager.checkShardVersionCB( _conn, staleNS, false, 1 );
- }
-
- LOG( i > 1 ? 0 : 1 ) << "retrying lazy command" << causedBy( e ) << endl;
-
- verify( _conn->lazySupported() );
- _done = false;
- init();
- continue;
}
- catch ( std::exception& e ) {
- error() << "Future::spawnCommand (part 2) exception: " << causedBy( e ) << endl;
- break;
+
+ // We may not always have a collection, since we don't know from a generic command what collection
+ // is supposed to be acted on, if any
+ if (nsGetCollection(staleNS).size() == 0) {
+ warning() << "no collection namespace in stale config exception "
+ << "for lazy command " << _cmd << ", could not refresh " << staleNS
+ << endl;
+ } else {
+ versionManager.checkShardVersionCB(_conn, staleNS, false, 1);
}
- }
+ LOG(i > 1 ? 0 : 1) << "retrying lazy command" << causedBy(e) << endl;
- _done = true;
- return _ok;
+ verify(_conn->lazySupported());
+ _done = false;
+ init();
+ continue;
+ } catch (std::exception& e) {
+ error() << "Future::spawnCommand (part 2) exception: " << causedBy(e) << endl;
+ break;
+ }
}
- shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server,
- const string& db,
- const BSONObj& cmd,
- int options,
- DBClientBase * conn,
- bool useShardConn ) {
- shared_ptr<Future::CommandResult> res (
- new Future::CommandResult( server,
- db,
- cmd,
- options,
- conn,
- useShardConn));
- return res;
- }
+ _done = true;
+ return _ok;
+}
+shared_ptr<Future::CommandResult> Future::spawnCommand(const string& server,
+ const string& db,
+ const BSONObj& cmd,
+ int options,
+ DBClientBase* conn,
+ bool useShardConn) {
+ shared_ptr<Future::CommandResult> res(
+ new Future::CommandResult(server, db, cmd, options, conn, useShardConn));
+ return res;
+}
}