summaryrefslogtreecommitdiff
path: root/src/mongo/s/cluster_write.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/cluster_write.cpp')
-rw-r--r--src/mongo/s/cluster_write.cpp399
1 files changed, 194 insertions, 205 deletions
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp
index 4d6e5b878b5..45b3b0abb52 100644
--- a/src/mongo/s/cluster_write.cpp
+++ b/src/mongo/s/cluster_write.cpp
@@ -51,250 +51,239 @@
namespace mongo {
- using std::shared_ptr;
- using std::unique_ptr;
- using std::vector;
- using std::map;
- using std::string;
- using std::stringstream;
-
- const int ConfigOpTimeoutMillis = 30 * 1000;
-
- namespace {
-
- /**
- * Constructs the BSON specification document for the given namespace, index key
- * and options.
- */
- BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) {
- BSONObjBuilder indexDoc;
- indexDoc.append("ns", ns);
- indexDoc.append("key", keys);
-
- stringstream indexName;
-
- bool isFirstKey = true;
- for (BSONObjIterator keyIter(keys); keyIter.more();) {
- BSONElement currentKey = keyIter.next();
-
- if (isFirstKey) {
- isFirstKey = false;
- }
- else {
- indexName << "_";
- }
-
- indexName << currentKey.fieldName() << "_";
- if (currentKey.isNumber()) {
- indexName << currentKey.numberInt();
- }
- else {
- indexName << currentKey.str(); //this should match up with shell command
- }
- }
-
- indexDoc.append("name", indexName.str());
-
- if (unique) {
- indexDoc.appendBool("unique", unique);
- }
-
- return indexDoc.obj();
- }
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+using std::map;
+using std::string;
+using std::stringstream;
+
+const int ConfigOpTimeoutMillis = 30 * 1000;
+
+namespace {
+
+/**
+ * Constructs the BSON specification document for the given namespace, index key
+ * and options.
+ */
+BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) {
+ BSONObjBuilder indexDoc;
+ indexDoc.append("ns", ns);
+ indexDoc.append("key", keys);
+
+ stringstream indexName;
- void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setErrCode(status.code());
- response->setErrMessage(status.reason());
- response->setOk(false);
- dassert(response->isValid(NULL));
+ bool isFirstKey = true;
+ for (BSONObjIterator keyIter(keys); keyIter.more();) {
+ BSONElement currentKey = keyIter.next();
+
+ if (isFirstKey) {
+ isFirstKey = false;
+ } else {
+ indexName << "_";
}
- /**
- * Splits the chunks touched based from the targeter stats if needed.
- */
- void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) {
- if (!Chunk::ShouldAutoSplit) {
- return;
- }
-
- auto status = grid.catalogCache()->getDatabase(nss.db().toString());
- if (!status.isOK()) {
- warning() << "failed to get database config for " << nss
- << " while checking for auto-split: " << status.getStatus();
- return;
- }
-
- shared_ptr<DBConfig> config = status.getValue();
-
- ChunkManagerPtr chunkManager;
- ShardPtr dummyShard;
- config->getChunkManagerOrPrimary(nss, chunkManager, dummyShard);
-
- if (!chunkManager) {
- return;
- }
-
- for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin();
- it != stats.chunkSizeDelta.end();
- ++it) {
-
- ChunkPtr chunk;
- try {
- chunk = chunkManager->findIntersectingChunk(it->first);
- }
- catch (const AssertionException& ex) {
- warning() << "could not find chunk while checking for auto-split: "
- << causedBy(ex);
- return;
- }
-
- chunk->splitIfShould(it->second);
- }
+ indexName << currentKey.fieldName() << "_";
+ if (currentKey.isNumber()) {
+ indexName << currentKey.numberInt();
+ } else {
+ indexName << currentKey.str(); // this should match up with shell command
}
+ }
- } // namespace
+ indexDoc.append("name", indexName.str());
- Status clusterCreateIndex( const string& ns,
- BSONObj keys,
- bool unique,
- BatchedCommandResponse* response ) {
+ if (unique) {
+ indexDoc.appendBool("unique", unique);
+ }
- const NamespaceString nss(ns);
- const std::string dbName = nss.db().toString();
+ return indexDoc.obj();
+}
- BSONObj indexDoc = createIndexDoc(ns, keys, unique);
+void toBatchError(const Status& status, BatchedCommandResponse* response) {
+ response->clear();
+ response->setErrCode(status.code());
+ response->setErrMessage(status.reason());
+ response->setOk(false);
+ dassert(response->isValid(NULL));
+}
- // Go through the shard insert path
- std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest());
- insert->addToDocuments(indexDoc);
+/**
+ * Splits the chunks touched based from the targeter stats if needed.
+ */
+void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) {
+ if (!Chunk::ShouldAutoSplit) {
+ return;
+ }
- BatchedCommandRequest request(insert.release());
- request.setNS(nss.getSystemIndexesCollection());
- request.setWriteConcern(WriteConcernOptions::Acknowledged);
+ auto status = grid.catalogCache()->getDatabase(nss.db().toString());
+ if (!status.isOK()) {
+ warning() << "failed to get database config for " << nss
+ << " while checking for auto-split: " << status.getStatus();
+ return;
+ }
- BatchedCommandResponse dummyResponse;
- if (response == NULL) {
- response = &dummyResponse;
- }
+ shared_ptr<DBConfig> config = status.getValue();
+
+ ChunkManagerPtr chunkManager;
+ ShardPtr dummyShard;
+ config->getChunkManagerOrPrimary(nss, chunkManager, dummyShard);
- ClusterWriter writer(false, 0);
- writer.write(request, response);
+ if (!chunkManager) {
+ return;
+ }
- if (response->getOk() != 1) {
- return Status(static_cast<ErrorCodes::Error>(response->getErrCode()),
- response->getErrMessage());
+ for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin();
+ it != stats.chunkSizeDelta.end();
+ ++it) {
+ ChunkPtr chunk;
+ try {
+ chunk = chunkManager->findIntersectingChunk(it->first);
+ } catch (const AssertionException& ex) {
+ warning() << "could not find chunk while checking for auto-split: " << causedBy(ex);
+ return;
}
- if (response->isErrDetailsSet()) {
- const WriteErrorDetail* errDetail = response->getErrDetails().front();
+ chunk->splitIfShould(it->second);
+ }
+}
+
+} // namespace
- return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
- errDetail->getErrMessage());
- }
+Status clusterCreateIndex(const string& ns,
+ BSONObj keys,
+ bool unique,
+ BatchedCommandResponse* response) {
+ const NamespaceString nss(ns);
+ const std::string dbName = nss.db().toString();
- if (response->isWriteConcernErrorSet()) {
- const WCErrorDetail* errDetail = response->getWriteConcernError();
+ BSONObj indexDoc = createIndexDoc(ns, keys, unique);
- return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
- errDetail->getErrMessage());
- }
+ // Go through the shard insert path
+ std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest());
+ insert->addToDocuments(indexDoc);
- return Status::OK();
+ BatchedCommandRequest request(insert.release());
+ request.setNS(nss.getSystemIndexesCollection());
+ request.setWriteConcern(WriteConcernOptions::Acknowledged);
+
+ BatchedCommandResponse dummyResponse;
+ if (response == NULL) {
+ response = &dummyResponse;
}
+ ClusterWriter writer(false, 0);
+ writer.write(request, response);
- void ClusterWriter::write( const BatchedCommandRequest& origRequest,
- BatchedCommandResponse* response ) {
+ if (response->getOk() != 1) {
+ return Status(static_cast<ErrorCodes::Error>(response->getErrCode()),
+ response->getErrMessage());
+ }
- // Add _ids to insert request if req'd
- unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest));
- const BatchedCommandRequest& request = NULL != idRequest.get() ? *idRequest : origRequest;
+ if (response->isErrDetailsSet()) {
+ const WriteErrorDetail* errDetail = response->getErrDetails().front();
- const NamespaceString& nss = request.getNSS();
- if ( !nss.isValid() ) {
- toBatchError( Status( ErrorCodes::InvalidNamespace,
- nss.ns() + " is not a valid namespace" ),
- response );
- return;
- }
+ return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
+ errDetail->getErrMessage());
+ }
- if ( !NamespaceString::validCollectionName( nss.coll() ) ) {
- toBatchError( Status( ErrorCodes::BadValue,
- str::stream() << "invalid collection name " << nss.coll() ),
- response );
- return;
- }
+ if (response->isWriteConcernErrorSet()) {
+ const WCErrorDetail* errDetail = response->getWriteConcernError();
- if ( request.sizeWriteOps() == 0u ) {
- toBatchError( Status( ErrorCodes::InvalidLength,
- "no write ops were included in the batch" ),
- response );
- return;
- }
+ return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
+ errDetail->getErrMessage());
+ }
- if ( request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize ) {
- toBatchError( Status( ErrorCodes::InvalidLength,
- str::stream() << "exceeded maximum write batch size of "
- << BatchedCommandRequest::kMaxWriteBatchSize ),
- response );
- return;
- }
+ return Status::OK();
+}
- string errMsg;
- if ( request.isInsertIndexRequest() && !request.isValidIndexRequest( &errMsg ) ) {
- toBatchError( Status( ErrorCodes::InvalidOptions, errMsg ), response );
- return;
- }
- // Config writes and shard writes are done differently
- const string dbName = nss.db().toString();
+void ClusterWriter::write(const BatchedCommandRequest& origRequest,
+ BatchedCommandResponse* response) {
+ // Add _ids to insert request if req'd
+ unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest));
+ const BatchedCommandRequest& request = NULL != idRequest.get() ? *idRequest : origRequest;
- if (dbName == "config" || dbName == "admin") {
- grid.catalogManager()->writeConfigServerDirect(request, response);
- }
- else {
- ChunkManagerTargeter targeter(request.getTargetingNSS());
- Status targetInitStatus = targeter.init();
-
- if (!targetInitStatus.isOK()) {
- // Errors will be reported in response if we are unable to target
- warning() << "could not initialize targeter for"
- << (request.isInsertIndexRequest() ? " index" : "")
- << " write op in collection " << request.getTargetingNS();
- }
-
- DBClientShardResolver resolver;
- DBClientMultiCommand dispatcher;
- BatchWriteExec exec(&targeter, &resolver, &dispatcher);
- exec.executeBatch(request, response);
-
- if (_autoSplit) {
- splitIfNeeded(request.getNSS(), *targeter.getStats());
- }
-
- _stats->setShardStats(exec.releaseStats());
- }
+ const NamespaceString& nss = request.getNSS();
+ if (!nss.isValid()) {
+ toBatchError(Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace"),
+ response);
+ return;
}
- ClusterWriter::ClusterWriter( bool autoSplit, int timeoutMillis ) :
- _autoSplit( autoSplit ), _timeoutMillis( timeoutMillis ), _stats( new ClusterWriterStats ) {
+ if (!NamespaceString::validCollectionName(nss.coll())) {
+ toBatchError(
+ Status(ErrorCodes::BadValue, str::stream() << "invalid collection name " << nss.coll()),
+ response);
+ return;
}
- const ClusterWriterStats& ClusterWriter::getStats() {
- return *_stats;
+ if (request.sizeWriteOps() == 0u) {
+ toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"),
+ response);
+ return;
}
- void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) {
- _shardStats.reset( shardStats );
+ if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) {
+ toBatchError(Status(ErrorCodes::InvalidLength,
+ str::stream() << "exceeded maximum write batch size of "
+ << BatchedCommandRequest::kMaxWriteBatchSize),
+ response);
+ return;
}
- bool ClusterWriterStats::hasShardStats() const {
- return NULL != _shardStats.get();
+ string errMsg;
+ if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) {
+ toBatchError(Status(ErrorCodes::InvalidOptions, errMsg), response);
+ return;
}
- const BatchWriteExecStats& ClusterWriterStats::getShardStats() const {
- return *_shardStats;
+ // Config writes and shard writes are done differently
+ const string dbName = nss.db().toString();
+
+ if (dbName == "config" || dbName == "admin") {
+ grid.catalogManager()->writeConfigServerDirect(request, response);
+ } else {
+ ChunkManagerTargeter targeter(request.getTargetingNSS());
+ Status targetInitStatus = targeter.init();
+
+ if (!targetInitStatus.isOK()) {
+ // Errors will be reported in response if we are unable to target
+ warning() << "could not initialize targeter for"
+ << (request.isInsertIndexRequest() ? " index" : "")
+ << " write op in collection " << request.getTargetingNS();
+ }
+
+ DBClientShardResolver resolver;
+ DBClientMultiCommand dispatcher;
+ BatchWriteExec exec(&targeter, &resolver, &dispatcher);
+ exec.executeBatch(request, response);
+
+ if (_autoSplit) {
+ splitIfNeeded(request.getNSS(), *targeter.getStats());
+ }
+
+ _stats->setShardStats(exec.releaseStats());
}
+}
+
+ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis)
+ : _autoSplit(autoSplit), _timeoutMillis(timeoutMillis), _stats(new ClusterWriterStats) {}
+
+const ClusterWriterStats& ClusterWriter::getStats() {
+ return *_stats;
+}
+
+void ClusterWriterStats::setShardStats(BatchWriteExecStats* shardStats) {
+ _shardStats.reset(shardStats);
+}
+
+bool ClusterWriterStats::hasShardStats() const {
+ return NULL != _shardStats.get();
+}
+
+const BatchWriteExecStats& ClusterWriterStats::getShardStats() const {
+ return *_shardStats;
+}
-} // namespace mongo
+} // namespace mongo