diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-03-13 14:13:31 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-03-20 15:20:02 -0400 |
commit | 25476fbd6a359e638ab8a98acbacea56cc55f7ab (patch) | |
tree | 61c212de77a3add064a4dd4f3404a1c34c14cea0 /src/mongo/s/cluster_write.cpp | |
parent | 39e01c76694ed4b9cdd8315dab56ecf19f099af1 (diff) | |
download | mongo-25476fbd6a359e638ab8a98acbacea56cc55f7ab.tar.gz |
SERVER-17637 Sharding catalog manager interface
The beginnings of a sharding catalog manager interface to abstract catalog
operations.
Diffstat (limited to 'src/mongo/s/cluster_write.cpp')
-rw-r--r-- | src/mongo/s/cluster_write.cpp | 355 |
1 files changed, 99 insertions, 256 deletions
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index 308138e95c8..e16d977e94b 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/client/dbclient_multi_command.h" @@ -44,16 +45,13 @@ #include "mongo/s/dbclient_shard_resolver.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batch_write_exec.h" -#include "mongo/s/write_ops/config_coordinator.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/hostandport.h" namespace mongo { using std::auto_ptr; using std::vector; - using std::endl; using std::map; using std::string; using std::stringstream; @@ -61,23 +59,23 @@ namespace mongo { const int ConfigOpTimeoutMillis = 30 * 1000; namespace { - // TODO: consider writing a type for index instead + /** * Constructs the BSON specification document for the given namespace, index key * and options. */ - BSONObj createIndexDoc( const string& ns, const BSONObj& keys, bool unique ) { + BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) { BSONObjBuilder indexDoc; - indexDoc.append( "ns" , ns ); - indexDoc.append( "key" , keys ); + indexDoc.append("ns", ns); + indexDoc.append("key", keys); stringstream indexName; bool isFirstKey = true; - for ( BSONObjIterator keyIter(keys); keyIter.more(); ) { + for (BSONObjIterator keyIter(keys); keyIter.more();) { BSONElement currentKey = keyIter.next(); - if ( isFirstKey ) { + if (isFirstKey) { isFirstKey = false; } else { @@ -85,7 +83,7 @@ namespace mongo { } indexName << currentKey.fieldName() << "_"; - if ( currentKey.isNumber() ) { + if (currentKey.isNumber()) { indexName << currentKey.numberInt(); } else { @@ -93,223 +91,118 @@ namespace mongo { } } - indexDoc.append( "name", indexName.str() ); + indexDoc.append("name", indexName.str()); - if ( unique ) { - indexDoc.appendBool( "unique", unique ); + if (unique) { + indexDoc.appendBool("unique", unique); } return indexDoc.obj(); } - void clusterWrite(const BatchedCommandRequest& request, - BatchedCommandResponse* response, - bool autoSplit) { - - ClusterWriter writer(autoSplit, 0); - writer.write(request, response); - } - } - - /** - * Splits the chunks touched based from the targeter stats if needed. - */ - static void splitIfNeeded( const string& ns, const TargeterStats& stats ) { - if ( !Chunk::ShouldAutoSplit ) { - return; - } - - DBConfigPtr config; - - try { - config = grid.getDBConfig( ns ); + void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); + dassert(response->isValid(NULL)); } - catch ( const DBException& ex ) { - warning() << "failed to get database config for " << ns - << " while checking for auto-split: " << causedBy( ex ) << endl; - return; - } - - ChunkManagerPtr chunkManager; - ShardPtr dummyShard; - config->getChunkManagerOrPrimary( ns, chunkManager, dummyShard ); - if ( !chunkManager ) { - return; - } + /** + * Splits the chunks touched based from the targeter stats if needed. + */ + void splitIfNeeded(const string& ns, const TargeterStats& stats) { + if (!Chunk::ShouldAutoSplit) { + return; + } - for ( map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin(); - it != stats.chunkSizeDelta.end(); ++it ) { + DBConfigPtr config; - ChunkPtr chunk; try { - chunk = chunkManager->findIntersectingChunk( it->first ); + config = grid.getDBConfig(ns); } - catch ( const AssertionException& ex ) { - warning() << "could not find chunk while checking for auto-split: " - << causedBy( ex ) << endl; + catch (const DBException& ex) { + warning() << "failed to get database config for " << ns + << " while checking for auto-split: " << causedBy(ex); return; } - chunk->splitIfShould( it->second ); - } - } - - /** - * Returns the currently-set config hosts for a cluster - */ - static vector<ConnectionString> getConfigHosts() { + ChunkManagerPtr chunkManager; + ShardPtr dummyShard; + config->getChunkManagerOrPrimary(ns, chunkManager, dummyShard); - vector<ConnectionString> configHosts; - ConnectionString configHostOrHosts = configServer.getConnectionString(); - if ( configHostOrHosts.type() == ConnectionString::MASTER ) { - configHosts.push_back( configHostOrHosts ); - } - else if ( configHostOrHosts.type() == ConnectionString::SYNC ) { - vector<HostAndPort> configHPs = configHostOrHosts.getServers(); - for ( vector<HostAndPort>::iterator it = configHPs.begin(); it != configHPs.end(); - ++it ) { - configHosts.push_back( ConnectionString( *it ) ); + if (!chunkManager) { + return; } - } - else { - // This is only for tests. - dassert( configHostOrHosts.type() == ConnectionString::CUSTOM ); - configHosts.push_back( configHostOrHosts ); - } - return configHosts; - } - - static Status getStatus( const BatchedCommandResponse& response ) { - if ( response.getOk() != 1 ) { - return Status( static_cast<ErrorCodes::Error>(response.getErrCode()), - response.getErrMessage() ); - } + for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin(); + it != stats.chunkSizeDelta.end(); + ++it) { - if ( response.isErrDetailsSet() ) { - const WriteErrorDetail* errDetail = response.getErrDetails().front(); - return Status( static_cast<ErrorCodes::Error>(errDetail->getErrCode()), - errDetail->getErrMessage() ); - } + ChunkPtr chunk; + try { + chunk = chunkManager->findIntersectingChunk(it->first); + } + catch (const AssertionException& ex) { + warning() << "could not find chunk while checking for auto-split: " + << causedBy(ex); + return; + } - if ( response.isWriteConcernErrorSet() ) { - const WCErrorDetail* errDetail = response.getWriteConcernError(); - return Status( static_cast<ErrorCodes::Error>(errDetail->getErrCode()), - errDetail->getErrMessage() ); + chunk->splitIfShould(it->second); + } } - return Status::OK(); - } + } // namespace - Status clusterInsert( const string& ns, - const BSONObj& doc, - BatchedCommandResponse* response ) { - auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() ); - insert->addToDocuments( doc ); + Status clusterCreateIndex( const string& ns, + BSONObj keys, + bool unique, + BatchedCommandResponse* response ) { - BatchedCommandRequest request( insert.release() ); - request.setNS( ns ); - request.setWriteConcern(WriteConcernOptions::Acknowledged); + const NamespaceString nss(ns); + const std::string dbName = nss.db().toString(); - BatchedCommandResponse dummyResponse; + BSONObj indexDoc = createIndexDoc(ns, keys, unique); - if ( response == NULL ) { - response = &dummyResponse; - } + // Go through the shard insert path + std::auto_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest()); + insert->addToDocuments(indexDoc); - clusterWrite( request, response, false ); - return getStatus( *response ); - } - - Status clusterUpdate( const string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi, - BatchedCommandResponse* response ) { - auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() ); - updateDoc->setQuery( query ); - updateDoc->setUpdateExpr( update ); - updateDoc->setUpsert( upsert ); - updateDoc->setMulti( multi ); - - auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() ); - updateRequest->addToUpdates( updateDoc.release() ); - updateRequest->setWriteConcern(WriteConcernOptions::Acknowledged); - - BatchedCommandRequest request( updateRequest.release() ); - request.setNS( ns ); + BatchedCommandRequest request(insert.release()); + request.setNS(nss.getSystemIndexesCollection()); + request.setWriteConcern(WriteConcernOptions::Acknowledged); BatchedCommandResponse dummyResponse; - - if ( response == NULL ) { + if (response == NULL) { response = &dummyResponse; } - clusterWrite( request, response, false ); - return getStatus( *response ); - } - - Status clusterDelete( const string& ns, - const BSONObj& query, - int limit, - BatchedCommandResponse* response ) { - auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument ); - deleteDoc->setQuery( query ); - deleteDoc->setLimit( limit ); - - auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() ); - deleteRequest->addToDeletes( deleteDoc.release() ); - deleteRequest->setWriteConcern(WriteConcernOptions::Acknowledged); - - BatchedCommandRequest request( deleteRequest.release() ); - request.setNS( ns ); - - BatchedCommandResponse dummyResponse; + ClusterWriter writer(false, 0); + writer.write(request, response); - if ( response == NULL ) { - response = &dummyResponse; + if (response->getOk() != 1) { + return Status(static_cast<ErrorCodes::Error>(response->getErrCode()), + response->getErrMessage()); } - clusterWrite( request, response, false ); - return getStatus( *response ); - } + if (response->isErrDetailsSet()) { + const WriteErrorDetail* errDetail = response->getErrDetails().front(); - Status clusterCreateIndex( const string& ns, - BSONObj keys, - bool unique, - BatchedCommandResponse* response ) { - return clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(), - createIndexDoc( ns, keys, unique ), - response ); - } - - bool validConfigWC( const BSONObj& writeConcern ) { - BSONElement elem(writeConcern["w"]); - - if ( elem.eoo() ) { - return true; + return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()), + errDetail->getErrMessage()); } - if ( elem.isNumber() && elem.numberInt() <= 1 ) { - return true; - } + if (response->isWriteConcernErrorSet()) { + const WCErrorDetail* errDetail = response->getWriteConcernError(); - if ( elem.type() == String && elem.str() == "majority" ) { - return true; + return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()), + errDetail->getErrMessage()); } - return false; + return Status::OK(); } - static void toBatchError( const Status& status, BatchedCommandResponse* response ) { - response->clear(); - response->setErrCode( status.code() ); - response->setErrMessage( status.reason() ); - response->setOk( false ); - dassert( response->isValid(NULL) ); - } void ClusterWriter::write( const BatchedCommandRequest& origRequest, BatchedCommandResponse* response ) { @@ -355,34 +248,32 @@ namespace mongo { } // Config writes and shard writes are done differently - string dbName = nss.db().toString(); - if ( dbName == "config" || dbName == "admin" ) { - // We only support batch sizes of one for config writes - if ( request.sizeWriteOps() != 1 ) { - toBatchError( Status( ErrorCodes::InvalidOptions, - mongoutils::str::stream() << "Writes to config servers must " - "have batch size of 1, found " - << request.sizeWriteOps() ), - response ); - return; + const string dbName = nss.db().toString(); + + if (dbName == "config" || dbName == "admin") { + grid.catalogManager()->writeConfigServerDirect(request, response); + } + else { + ChunkManagerTargeter targeter(request.getTargetingNSS()); + Status targetInitStatus = targeter.init(); + + if (!targetInitStatus.isOK()) { + // Errors will be reported in response if we are unable to target + warning() << "could not initialize targeter for" + << (request.isInsertIndexRequest() ? " index" : "") + << " write op in collection " << request.getTargetingNS(); } - // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes - if ( request.isWriteConcernSet() && !validConfigWC( request.getWriteConcern() )) { - toBatchError( Status( ErrorCodes::InvalidOptions, - mongoutils::str::stream() << "Invalid write concern for write" - " to config servers: " << request.getWriteConcern() ), - response ); - return; + DBClientShardResolver resolver; + DBClientMultiCommand dispatcher; + BatchWriteExec exec(&targeter, &resolver, &dispatcher); + exec.executeBatch(request, response); + + if (_autoSplit) { + splitIfNeeded(request.getNS(), *targeter.getStats()); } - // We need to support "best-effort" writes for pings to the config server. - // {w:0} (!verbose) writes are interpreted as best-effort in this case - they may still - // error, but do not do the initial fsync check. - configWrite(request, response); - } - else { - shardWrite( request, response ); + _stats->setShardStats(exec.releaseStats()); } } @@ -394,54 +285,6 @@ namespace mongo { return *_stats; } - void ClusterWriter::shardWrite( const BatchedCommandRequest& request, - BatchedCommandResponse* response ) { - - ChunkManagerTargeter targeter(request.getTargetingNSS()); - Status targetInitStatus = targeter.init(); - - if ( !targetInitStatus.isOK() ) { - - warning() << "could not initialize targeter for" - << ( request.isInsertIndexRequest() ? " index" : "" ) - << " write op in collection " << request.getTargetingNS() << endl; - - // Errors will be reported in response if we are unable to target - } - - DBClientShardResolver resolver; - DBClientMultiCommand dispatcher; - BatchWriteExec exec( &targeter, &resolver, &dispatcher ); - exec.executeBatch( request, response ); - - if ( _autoSplit ) - splitIfNeeded( request.getNS(), *targeter.getStats() ); - - _stats->setShardStats( exec.releaseStats() ); - } - - void ClusterWriter::configWrite(const BatchedCommandRequest& request, - BatchedCommandResponse* response) { - - DBClientMultiCommand dispatcher; - vector<ConnectionString> configHosts = getConfigHosts(); - - if (configHosts.size() > 1) { - - // We can't support no-_id upserts to multiple config servers - the _ids will differ - if (BatchedCommandRequest::containsNoIDUpsert(request)) { - toBatchError(Status(ErrorCodes::InvalidOptions, - mongoutils::str::stream() << "upserts to multiple config servers must" - " include _id"), - response); - return; - } - } - - ConfigCoordinator exec( &dispatcher, configHosts ); - exec.executeBatch(request, response); - } - void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) { _shardStats.reset( shardStats ); } |