diff options
Diffstat (limited to 'src/mongo/s/cluster_write.cpp')
-rw-r--r-- | src/mongo/s/cluster_write.cpp | 161 |
1 files changed, 94 insertions, 67 deletions
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index f54718395ee..6297eab2d1d 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -160,43 +160,73 @@ namespace mongo { return configHosts; } - static void shardWrite( const BatchedCommandRequest& request, - BatchedCommandResponse* response, - bool autoSplit ) { + void clusterInsert( const string& ns, + const BSONObj& doc, + BatchedCommandResponse* response ) { + auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() ); + insert->addToDocuments( doc ); - ChunkManagerTargeter targeter; - Status targetInitStatus = targeter.init( NamespaceString( request.getTargetingNS() ) ); + BatchedCommandRequest request( insert.release() ); + request.setNS( ns ); - if ( !targetInitStatus.isOK() ) { + clusterWrite( request, response, false ); + } - warning() << "could not initialize targeter for" - << ( request.isInsertIndexRequest() ? " index" : "" ) - << " write op in collection " << request.getTargetingNS() << endl; + void clusterUpdate( const string& ns, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi, + BatchedCommandResponse* response ) { + auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() ); + updateDoc->setQuery( query ); + updateDoc->setUpdateExpr( update ); + updateDoc->setUpsert( upsert ); + updateDoc->setMulti( multi ); - // Errors will be reported in response if we are unable to target - } + auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() ); + updateRequest->addToUpdates( updateDoc.release() ); - DBClientShardResolver resolver; - DBClientMultiCommand dispatcher; - BatchWriteExec exec( &targeter, &resolver, &dispatcher ); - exec.executeBatch( request, response ); + BatchedCommandRequest request( updateRequest.release() ); + request.setNS( ns ); - if ( autoSplit ) splitIfNeeded( request.getNS(), *targeter.getStats() ); + clusterWrite( request, response, false ); } - static void configWrite( const BatchedCommandRequest& request, - BatchedCommandResponse* response, - bool fsyncCheck ) { + void clusterDelete( const string& ns, + const BSONObj& query, + int limit, + BatchedCommandResponse* response ) { + auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument ); + deleteDoc->setQuery( query ); + deleteDoc->setLimit( limit ); - DBClientMultiCommand dispatcher; - dispatcher.setTimeoutMillis( ConfigOpTimeoutMillis ); - ConfigCoordinator exec( &dispatcher, getConfigHosts() ); - exec.executeBatch( request, response, fsyncCheck ); + auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() ); + deleteRequest->addToDeletes( deleteDoc.release() ); + + BatchedCommandRequest request( deleteRequest.release() ); + request.setNS( ns ); + + clusterWrite( request, response, false ); + } + + void clusterCreateIndex( const string& ns, + BSONObj keys, + bool unique, + BatchedCommandResponse* response) { + clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(), + createIndexDoc( ns, keys, unique ), response ); } void clusterWrite( const BatchedCommandRequest& request, BatchedCommandResponse* response, bool autoSplit ) { + ClusterWriter writer( autoSplit, 0 ); + writer.write( request, response ); + } + + void ClusterWriter::write( const BatchedCommandRequest& request, + BatchedCommandResponse* response ) { // App-level validation of a create index insert if ( request.isInsertIndexRequest() ) { @@ -255,66 +285,63 @@ namespace mongo { configWrite( request, response, verboseWC ); } else { - shardWrite( request, response, autoSplit ); + shardWrite( request, response ); } } - void clusterInsert( const string& ns, - const BSONObj& doc, - BatchedCommandResponse* response ) { - auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() ); - insert->addToDocuments( doc ); - - BatchedCommandRequest request( insert.release() ); - request.setNS( ns ); + ClusterWriter::ClusterWriter( bool autoSplit, int timeoutMillis ) : + _autoSplit( autoSplit ), _timeoutMillis( timeoutMillis ), _stats( new ClusterWriterStats ) { + } - clusterWrite( request, response, false ); + const ClusterWriterStats& ClusterWriter::getStats() { + return *_stats; } - void clusterUpdate( const string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi, - BatchedCommandResponse* response ) { - auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() ); - updateDoc->setQuery( query ); - updateDoc->setUpdateExpr( update ); - updateDoc->setUpsert( upsert ); - updateDoc->setMulti( multi ); + void ClusterWriter::shardWrite( const BatchedCommandRequest& request, + BatchedCommandResponse* response ) { - auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() ); - updateRequest->addToUpdates( updateDoc.release() ); + ChunkManagerTargeter targeter; + Status targetInitStatus = targeter.init( NamespaceString( request.getTargetingNS() ) ); - BatchedCommandRequest request( updateRequest.release() ); - request.setNS( ns ); + if ( !targetInitStatus.isOK() ) { - clusterWrite( request, response, false ); + warning() << "could not initialize targeter for" + << ( request.isInsertIndexRequest() ? " index" : "" ) + << " write op in collection " << request.getTargetingNS() << endl; + + // Errors will be reported in response if we are unable to target + } + + DBClientShardResolver resolver; + DBClientMultiCommand dispatcher; + BatchWriteExec exec( &targeter, &resolver, &dispatcher ); + exec.executeBatch( request, response ); + + if ( _autoSplit ) + splitIfNeeded( request.getNS(), *targeter.getStats() ); + + _stats->setShardStats( exec.releaseStats() ); } - void clusterDelete( const string& ns, - const BSONObj& query, - int limit, - BatchedCommandResponse* response ) { - auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument ); - deleteDoc->setQuery( query ); - deleteDoc->setLimit( limit ); + void ClusterWriter::configWrite( const BatchedCommandRequest& request, + BatchedCommandResponse* response, + bool fsyncCheck ) { - auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() ); - deleteRequest->addToDeletes( deleteDoc.release() ); + DBClientMultiCommand dispatcher; + ConfigCoordinator exec( &dispatcher, getConfigHosts() ); + exec.executeBatch( request, response, fsyncCheck ); + } - BatchedCommandRequest request( deleteRequest.release() ); - request.setNS( ns ); + void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) { + _shardStats.reset( shardStats ); + } - clusterWrite( request, response, false ); + bool ClusterWriterStats::hasShardStats() const { + return NULL != _shardStats.get(); } - void clusterCreateIndex( const string& ns, - BSONObj keys, - bool unique, - BatchedCommandResponse* response) { - clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(), - createIndexDoc( ns, keys, unique ), response ); + const BatchWriteExecStats& ClusterWriterStats::getShardStats() const { + return *_shardStats; } } // namespace mongo |