diff options
Diffstat (limited to 'src/mongo/s/cluster_write.cpp')
-rw-r--r-- | src/mongo/s/cluster_write.cpp | 399 |
1 files changed, 194 insertions, 205 deletions
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index 4d6e5b878b5..45b3b0abb52 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -51,250 +51,239 @@ namespace mongo { - using std::shared_ptr; - using std::unique_ptr; - using std::vector; - using std::map; - using std::string; - using std::stringstream; - - const int ConfigOpTimeoutMillis = 30 * 1000; - - namespace { - - /** - * Constructs the BSON specification document for the given namespace, index key - * and options. - */ - BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) { - BSONObjBuilder indexDoc; - indexDoc.append("ns", ns); - indexDoc.append("key", keys); - - stringstream indexName; - - bool isFirstKey = true; - for (BSONObjIterator keyIter(keys); keyIter.more();) { - BSONElement currentKey = keyIter.next(); - - if (isFirstKey) { - isFirstKey = false; - } - else { - indexName << "_"; - } - - indexName << currentKey.fieldName() << "_"; - if (currentKey.isNumber()) { - indexName << currentKey.numberInt(); - } - else { - indexName << currentKey.str(); //this should match up with shell command - } - } - - indexDoc.append("name", indexName.str()); - - if (unique) { - indexDoc.appendBool("unique", unique); - } - - return indexDoc.obj(); - } +using std::shared_ptr; +using std::unique_ptr; +using std::vector; +using std::map; +using std::string; +using std::stringstream; + +const int ConfigOpTimeoutMillis = 30 * 1000; + +namespace { + +/** + * Constructs the BSON specification document for the given namespace, index key + * and options. + */ +BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) { + BSONObjBuilder indexDoc; + indexDoc.append("ns", ns); + indexDoc.append("key", keys); + + stringstream indexName; - void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); - dassert(response->isValid(NULL)); + bool isFirstKey = true; + for (BSONObjIterator keyIter(keys); keyIter.more();) { + BSONElement currentKey = keyIter.next(); + + if (isFirstKey) { + isFirstKey = false; + } else { + indexName << "_"; } - /** - * Splits the chunks touched based from the targeter stats if needed. - */ - void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) { - if (!Chunk::ShouldAutoSplit) { - return; - } - - auto status = grid.catalogCache()->getDatabase(nss.db().toString()); - if (!status.isOK()) { - warning() << "failed to get database config for " << nss - << " while checking for auto-split: " << status.getStatus(); - return; - } - - shared_ptr<DBConfig> config = status.getValue(); - - ChunkManagerPtr chunkManager; - ShardPtr dummyShard; - config->getChunkManagerOrPrimary(nss, chunkManager, dummyShard); - - if (!chunkManager) { - return; - } - - for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin(); - it != stats.chunkSizeDelta.end(); - ++it) { - - ChunkPtr chunk; - try { - chunk = chunkManager->findIntersectingChunk(it->first); - } - catch (const AssertionException& ex) { - warning() << "could not find chunk while checking for auto-split: " - << causedBy(ex); - return; - } - - chunk->splitIfShould(it->second); - } + indexName << currentKey.fieldName() << "_"; + if (currentKey.isNumber()) { + indexName << currentKey.numberInt(); + } else { + indexName << currentKey.str(); // this should match up with shell command } + } - } // namespace + indexDoc.append("name", indexName.str()); - Status clusterCreateIndex( const string& ns, - BSONObj keys, - bool unique, - BatchedCommandResponse* response ) { + if (unique) { + indexDoc.appendBool("unique", unique); + } - const NamespaceString nss(ns); - const std::string dbName = nss.db().toString(); + return indexDoc.obj(); +} - BSONObj indexDoc = createIndexDoc(ns, keys, unique); +void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); + dassert(response->isValid(NULL)); +} - // Go through the shard insert path - std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest()); - insert->addToDocuments(indexDoc); +/** + * Splits the chunks touched based from the targeter stats if needed. + */ +void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) { + if (!Chunk::ShouldAutoSplit) { + return; + } - BatchedCommandRequest request(insert.release()); - request.setNS(nss.getSystemIndexesCollection()); - request.setWriteConcern(WriteConcernOptions::Acknowledged); + auto status = grid.catalogCache()->getDatabase(nss.db().toString()); + if (!status.isOK()) { + warning() << "failed to get database config for " << nss + << " while checking for auto-split: " << status.getStatus(); + return; + } - BatchedCommandResponse dummyResponse; - if (response == NULL) { - response = &dummyResponse; - } + shared_ptr<DBConfig> config = status.getValue(); + + ChunkManagerPtr chunkManager; + ShardPtr dummyShard; + config->getChunkManagerOrPrimary(nss, chunkManager, dummyShard); - ClusterWriter writer(false, 0); - writer.write(request, response); + if (!chunkManager) { + return; + } - if (response->getOk() != 1) { - return Status(static_cast<ErrorCodes::Error>(response->getErrCode()), - response->getErrMessage()); + for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin(); + it != stats.chunkSizeDelta.end(); + ++it) { + ChunkPtr chunk; + try { + chunk = chunkManager->findIntersectingChunk(it->first); + } catch (const AssertionException& ex) { + warning() << "could not find chunk while checking for auto-split: " << causedBy(ex); + return; } - if (response->isErrDetailsSet()) { - const WriteErrorDetail* errDetail = response->getErrDetails().front(); + chunk->splitIfShould(it->second); + } +} + +} // namespace - return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()), - errDetail->getErrMessage()); - } +Status clusterCreateIndex(const string& ns, + BSONObj keys, + bool unique, + BatchedCommandResponse* response) { + const NamespaceString nss(ns); + const std::string dbName = nss.db().toString(); - if (response->isWriteConcernErrorSet()) { - const WCErrorDetail* errDetail = response->getWriteConcernError(); + BSONObj indexDoc = createIndexDoc(ns, keys, unique); - return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()), - errDetail->getErrMessage()); - } + // Go through the shard insert path + std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest()); + insert->addToDocuments(indexDoc); - return Status::OK(); + BatchedCommandRequest request(insert.release()); + request.setNS(nss.getSystemIndexesCollection()); + request.setWriteConcern(WriteConcernOptions::Acknowledged); + + BatchedCommandResponse dummyResponse; + if (response == NULL) { + response = &dummyResponse; } + ClusterWriter writer(false, 0); + writer.write(request, response); - void ClusterWriter::write( const BatchedCommandRequest& origRequest, - BatchedCommandResponse* response ) { + if (response->getOk() != 1) { + return Status(static_cast<ErrorCodes::Error>(response->getErrCode()), + response->getErrMessage()); + } - // Add _ids to insert request if req'd - unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest)); - const BatchedCommandRequest& request = NULL != idRequest.get() ? *idRequest : origRequest; + if (response->isErrDetailsSet()) { + const WriteErrorDetail* errDetail = response->getErrDetails().front(); - const NamespaceString& nss = request.getNSS(); - if ( !nss.isValid() ) { - toBatchError( Status( ErrorCodes::InvalidNamespace, - nss.ns() + " is not a valid namespace" ), - response ); - return; - } + return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()), + errDetail->getErrMessage()); + } - if ( !NamespaceString::validCollectionName( nss.coll() ) ) { - toBatchError( Status( ErrorCodes::BadValue, - str::stream() << "invalid collection name " << nss.coll() ), - response ); - return; - } + if (response->isWriteConcernErrorSet()) { + const WCErrorDetail* errDetail = response->getWriteConcernError(); - if ( request.sizeWriteOps() == 0u ) { - toBatchError( Status( ErrorCodes::InvalidLength, - "no write ops were included in the batch" ), - response ); - return; - } + return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()), + errDetail->getErrMessage()); + } - if ( request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize ) { - toBatchError( Status( ErrorCodes::InvalidLength, - str::stream() << "exceeded maximum write batch size of " - << BatchedCommandRequest::kMaxWriteBatchSize ), - response ); - return; - } + return Status::OK(); +} - string errMsg; - if ( request.isInsertIndexRequest() && !request.isValidIndexRequest( &errMsg ) ) { - toBatchError( Status( ErrorCodes::InvalidOptions, errMsg ), response ); - return; - } - // Config writes and shard writes are done differently - const string dbName = nss.db().toString(); +void ClusterWriter::write(const BatchedCommandRequest& origRequest, + BatchedCommandResponse* response) { + // Add _ids to insert request if req'd + unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest)); + const BatchedCommandRequest& request = NULL != idRequest.get() ? *idRequest : origRequest; - if (dbName == "config" || dbName == "admin") { - grid.catalogManager()->writeConfigServerDirect(request, response); - } - else { - ChunkManagerTargeter targeter(request.getTargetingNSS()); - Status targetInitStatus = targeter.init(); - - if (!targetInitStatus.isOK()) { - // Errors will be reported in response if we are unable to target - warning() << "could not initialize targeter for" - << (request.isInsertIndexRequest() ? " index" : "") - << " write op in collection " << request.getTargetingNS(); - } - - DBClientShardResolver resolver; - DBClientMultiCommand dispatcher; - BatchWriteExec exec(&targeter, &resolver, &dispatcher); - exec.executeBatch(request, response); - - if (_autoSplit) { - splitIfNeeded(request.getNSS(), *targeter.getStats()); - } - - _stats->setShardStats(exec.releaseStats()); - } + const NamespaceString& nss = request.getNSS(); + if (!nss.isValid()) { + toBatchError(Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace"), + response); + return; } - ClusterWriter::ClusterWriter( bool autoSplit, int timeoutMillis ) : - _autoSplit( autoSplit ), _timeoutMillis( timeoutMillis ), _stats( new ClusterWriterStats ) { + if (!NamespaceString::validCollectionName(nss.coll())) { + toBatchError( + Status(ErrorCodes::BadValue, str::stream() << "invalid collection name " << nss.coll()), + response); + return; } - const ClusterWriterStats& ClusterWriter::getStats() { - return *_stats; + if (request.sizeWriteOps() == 0u) { + toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"), + response); + return; } - void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) { - _shardStats.reset( shardStats ); + if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) { + toBatchError(Status(ErrorCodes::InvalidLength, + str::stream() << "exceeded maximum write batch size of " + << BatchedCommandRequest::kMaxWriteBatchSize), + response); + return; } - bool ClusterWriterStats::hasShardStats() const { - return NULL != _shardStats.get(); + string errMsg; + if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) { + toBatchError(Status(ErrorCodes::InvalidOptions, errMsg), response); + return; } - const BatchWriteExecStats& ClusterWriterStats::getShardStats() const { - return *_shardStats; + // Config writes and shard writes are done differently + const string dbName = nss.db().toString(); + + if (dbName == "config" || dbName == "admin") { + grid.catalogManager()->writeConfigServerDirect(request, response); + } else { + ChunkManagerTargeter targeter(request.getTargetingNSS()); + Status targetInitStatus = targeter.init(); + + if (!targetInitStatus.isOK()) { + // Errors will be reported in response if we are unable to target + warning() << "could not initialize targeter for" + << (request.isInsertIndexRequest() ? " index" : "") + << " write op in collection " << request.getTargetingNS(); + } + + DBClientShardResolver resolver; + DBClientMultiCommand dispatcher; + BatchWriteExec exec(&targeter, &resolver, &dispatcher); + exec.executeBatch(request, response); + + if (_autoSplit) { + splitIfNeeded(request.getNSS(), *targeter.getStats()); + } + + _stats->setShardStats(exec.releaseStats()); } +} + +ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis) + : _autoSplit(autoSplit), _timeoutMillis(timeoutMillis), _stats(new ClusterWriterStats) {} + +const ClusterWriterStats& ClusterWriter::getStats() { + return *_stats; +} + +void ClusterWriterStats::setShardStats(BatchWriteExecStats* shardStats) { + _shardStats.reset(shardStats); +} + +bool ClusterWriterStats::hasShardStats() const { + return NULL != _shardStats.get(); +} + +const BatchWriteExecStats& ClusterWriterStats::getShardStats() const { + return *_shardStats; +} -} // namespace mongo +} // namespace mongo |