summaryrefslogtreecommitdiff
path: root/src/mongo/s/cluster_write.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/cluster_write.cpp')
-rw-r--r--src/mongo/s/cluster_write.cpp161
1 files changed, 94 insertions, 67 deletions
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp
index f54718395ee..6297eab2d1d 100644
--- a/src/mongo/s/cluster_write.cpp
+++ b/src/mongo/s/cluster_write.cpp
@@ -160,43 +160,73 @@ namespace mongo {
return configHosts;
}
- static void shardWrite( const BatchedCommandRequest& request,
- BatchedCommandResponse* response,
- bool autoSplit ) {
+ void clusterInsert( const string& ns,
+ const BSONObj& doc,
+ BatchedCommandResponse* response ) {
+ auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() );
+ insert->addToDocuments( doc );
- ChunkManagerTargeter targeter;
- Status targetInitStatus = targeter.init( NamespaceString( request.getTargetingNS() ) );
+ BatchedCommandRequest request( insert.release() );
+ request.setNS( ns );
- if ( !targetInitStatus.isOK() ) {
+ clusterWrite( request, response, false );
+ }
- warning() << "could not initialize targeter for"
- << ( request.isInsertIndexRequest() ? " index" : "" )
- << " write op in collection " << request.getTargetingNS() << endl;
+ void clusterUpdate( const string& ns,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool multi,
+ BatchedCommandResponse* response ) {
+ auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() );
+ updateDoc->setQuery( query );
+ updateDoc->setUpdateExpr( update );
+ updateDoc->setUpsert( upsert );
+ updateDoc->setMulti( multi );
- // Errors will be reported in response if we are unable to target
- }
+ auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() );
+ updateRequest->addToUpdates( updateDoc.release() );
- DBClientShardResolver resolver;
- DBClientMultiCommand dispatcher;
- BatchWriteExec exec( &targeter, &resolver, &dispatcher );
- exec.executeBatch( request, response );
+ BatchedCommandRequest request( updateRequest.release() );
+ request.setNS( ns );
- if ( autoSplit ) splitIfNeeded( request.getNS(), *targeter.getStats() );
+ clusterWrite( request, response, false );
}
- static void configWrite( const BatchedCommandRequest& request,
- BatchedCommandResponse* response,
- bool fsyncCheck ) {
+ void clusterDelete( const string& ns,
+ const BSONObj& query,
+ int limit,
+ BatchedCommandResponse* response ) {
+ auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument );
+ deleteDoc->setQuery( query );
+ deleteDoc->setLimit( limit );
- DBClientMultiCommand dispatcher;
- dispatcher.setTimeoutMillis( ConfigOpTimeoutMillis );
- ConfigCoordinator exec( &dispatcher, getConfigHosts() );
- exec.executeBatch( request, response, fsyncCheck );
+ auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() );
+ deleteRequest->addToDeletes( deleteDoc.release() );
+
+ BatchedCommandRequest request( deleteRequest.release() );
+ request.setNS( ns );
+
+ clusterWrite( request, response, false );
+ }
+
+ void clusterCreateIndex( const string& ns,
+ BSONObj keys,
+ bool unique,
+ BatchedCommandResponse* response) {
+ clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(),
+ createIndexDoc( ns, keys, unique ), response );
}
void clusterWrite( const BatchedCommandRequest& request,
BatchedCommandResponse* response,
bool autoSplit ) {
+ ClusterWriter writer( autoSplit, 0 );
+ writer.write( request, response );
+ }
+
+ void ClusterWriter::write( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response ) {
// App-level validation of a create index insert
if ( request.isInsertIndexRequest() ) {
@@ -255,66 +285,63 @@ namespace mongo {
configWrite( request, response, verboseWC );
}
else {
- shardWrite( request, response, autoSplit );
+ shardWrite( request, response );
}
}
- void clusterInsert( const string& ns,
- const BSONObj& doc,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() );
- insert->addToDocuments( doc );
-
- BatchedCommandRequest request( insert.release() );
- request.setNS( ns );
+ ClusterWriter::ClusterWriter( bool autoSplit, int timeoutMillis ) :
+ _autoSplit( autoSplit ), _timeoutMillis( timeoutMillis ), _stats( new ClusterWriterStats ) {
+ }
- clusterWrite( request, response, false );
+ const ClusterWriterStats& ClusterWriter::getStats() {
+ return *_stats;
}
- void clusterUpdate( const string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() );
- updateDoc->setQuery( query );
- updateDoc->setUpdateExpr( update );
- updateDoc->setUpsert( upsert );
- updateDoc->setMulti( multi );
+ void ClusterWriter::shardWrite( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response ) {
- auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() );
- updateRequest->addToUpdates( updateDoc.release() );
+ ChunkManagerTargeter targeter;
+ Status targetInitStatus = targeter.init( NamespaceString( request.getTargetingNS() ) );
- BatchedCommandRequest request( updateRequest.release() );
- request.setNS( ns );
+ if ( !targetInitStatus.isOK() ) {
- clusterWrite( request, response, false );
+ warning() << "could not initialize targeter for"
+ << ( request.isInsertIndexRequest() ? " index" : "" )
+ << " write op in collection " << request.getTargetingNS() << endl;
+
+ // Errors will be reported in response if we are unable to target
+ }
+
+ DBClientShardResolver resolver;
+ DBClientMultiCommand dispatcher;
+ BatchWriteExec exec( &targeter, &resolver, &dispatcher );
+ exec.executeBatch( request, response );
+
+ if ( _autoSplit )
+ splitIfNeeded( request.getNS(), *targeter.getStats() );
+
+ _stats->setShardStats( exec.releaseStats() );
}
- void clusterDelete( const string& ns,
- const BSONObj& query,
- int limit,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument );
- deleteDoc->setQuery( query );
- deleteDoc->setLimit( limit );
+ void ClusterWriter::configWrite( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ bool fsyncCheck ) {
- auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() );
- deleteRequest->addToDeletes( deleteDoc.release() );
+ DBClientMultiCommand dispatcher;
+ ConfigCoordinator exec( &dispatcher, getConfigHosts() );
+ exec.executeBatch( request, response, fsyncCheck );
+ }
- BatchedCommandRequest request( deleteRequest.release() );
- request.setNS( ns );
+ void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) {
+ _shardStats.reset( shardStats );
+ }
- clusterWrite( request, response, false );
+ bool ClusterWriterStats::hasShardStats() const {
+ return NULL != _shardStats.get();
}
- void clusterCreateIndex( const string& ns,
- BSONObj keys,
- bool unique,
- BatchedCommandResponse* response) {
- clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(),
- createIndexDoc( ns, keys, unique ), response );
+ const BatchWriteExecStats& ClusterWriterStats::getShardStats() const {
+ return *_shardStats;
}
} // namespace mongo