diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_write_cmd.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 357 |
1 files changed, 166 insertions, 191 deletions
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index a22dd2c7dc7..e499d708ec3 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -27,7 +27,7 @@ */ #include "mongo/platform/basic.h" - + #include "mongo/base/error_codes.h" #include "mongo/db/client.h" #include "mongo/db/client_basic.h" @@ -46,234 +46,209 @@ namespace mongo { - using std::string; - using std::stringstream; - using std::vector; +using std::string; +using std::stringstream; +using std::vector; namespace { - /** - * Base class for mongos write commands. Cluster write commands support batch writes and write - * concern, and return per-item error information. All cluster write commands use the entry - * point ClusterWriteCmd::run(). - * - * Batch execution (targeting and dispatching) is performed by the BatchWriteExec class. - */ - class ClusterWriteCmd : public Command { - public: - virtual ~ClusterWriteCmd() { - - } - - virtual bool slaveOk() const { - return false; - } - - virtual bool isWriteCommandForConfigServer() const { - return false; +/** + * Base class for mongos write commands. Cluster write commands support batch writes and write + * concern, and return per-item error information. All cluster write commands use the entry + * point ClusterWriteCmd::run(). + * + * Batch execution (targeting and dispatching) is performed by the BatchWriteExec class. + */ +class ClusterWriteCmd : public Command { +public: + virtual ~ClusterWriteCmd() {} + + virtual bool slaveOk() const { + return false; + } + + virtual bool isWriteCommandForConfigServer() const { + return false; + } + + virtual Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) { + Status status = auth::checkAuthForWriteCommand(AuthorizationSession::get(client), + _writeType, + NamespaceString(parseNs(dbname, cmdObj)), + cmdObj); + + // TODO: Remove this when we standardize GLE reporting from commands + if (!status.isOK()) { + LastError::get(client).setLastError(status.code(), status.reason()); } - virtual Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) { + return status; + } - Status status = auth::checkAuthForWriteCommand(AuthorizationSession::get(client), - _writeType, - NamespaceString(parseNs(dbname, - cmdObj)), - cmdObj); + virtual Status explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* out) const { + BatchedCommandRequest request(_writeType); - // TODO: Remove this when we standardize GLE reporting from commands - if (!status.isOK()) { - LastError::get(client).setLastError(status.code(), status.reason()); - } - - return status; + string errMsg; + if (!request.parseBSON(cmdObj, &errMsg) || !request.isValid(&errMsg)) { + return Status(ErrorCodes::FailedToParse, errMsg); } - virtual Status explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - BSONObjBuilder* out) const { + // Fixup the namespace to be a full ns internally + const NamespaceString nss(dbname, request.getNS()); + request.setNS(nss.ns()); - BatchedCommandRequest request(_writeType); + // We can only explain write batches of size 1. + if (request.sizeWriteOps() != 1U) { + return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1"); + } - string errMsg; - if (!request.parseBSON(cmdObj, &errMsg) || !request.isValid(&errMsg)) { - return Status(ErrorCodes::FailedToParse, errMsg); - } + BSONObjBuilder explainCmdBob; + ClusterExplain::wrapAsExplain(cmdObj, verbosity, &explainCmdBob); - // Fixup the namespace to be a full ns internally - const NamespaceString nss(dbname, request.getNS()); - request.setNS(nss.ns()); + // We will time how long it takes to run the commands on the shards. + Timer timer; - // We can only explain write batches of size 1. - if (request.sizeWriteOps() != 1U) { - return Status(ErrorCodes::InvalidLength, - "explained write batches must be of size 1"); - } + // Target the command to the shards based on the singleton batch item. + BatchItemRef targetingBatchItem(&request, 0); + vector<Strategy::CommandResult> shardResults; + Status status = Strategy::commandOpWrite( + dbname, explainCmdBob.obj(), targetingBatchItem, &shardResults); + if (!status.isOK()) { + return status; + } - BSONObjBuilder explainCmdBob; - ClusterExplain::wrapAsExplain(cmdObj, verbosity, &explainCmdBob); - - // We will time how long it takes to run the commands on the shards. - Timer timer; - - // Target the command to the shards based on the singleton batch item. - BatchItemRef targetingBatchItem(&request, 0); - vector<Strategy::CommandResult> shardResults; - Status status = Strategy::commandOpWrite(dbname, - explainCmdBob.obj(), - targetingBatchItem, - &shardResults); - if (!status.isOK()) { - return status; + return ClusterExplain::buildExplainResult( + shardResults, ClusterExplain::kWriteOnShards, timer.millis(), out); + } + + virtual bool run(OperationContext* txn, + const string& dbname, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) { + BatchedCommandRequest request(_writeType); + BatchedCommandResponse response; + + ClusterWriter writer(true, 0); + + LastError* cmdLastError = &LastError::get(cc()); + + { + // Disable the last error object for the duration of the write + LastError::Disabled disableLastError(cmdLastError); + + // TODO: if we do namespace parsing, push this to the type + if (!request.parseBSON(cmdObj, &errmsg) || !request.isValid(&errmsg)) { + // Batch parse failure + response.setOk(false); + response.setErrCode(ErrorCodes::FailedToParse); + response.setErrMessage(errmsg); + } else { + // Fixup the namespace to be a full ns internally + const NamespaceString nss(dbname, request.getNS()); + request.setNSS(nss); + + writer.write(request, &response); } - return ClusterExplain::buildExplainResult(shardResults, - ClusterExplain::kWriteOnShards, - timer.millis(), - out); + dassert(response.isValid(NULL)); } - virtual bool run(OperationContext* txn, - const string& dbname, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) { - - BatchedCommandRequest request(_writeType); - BatchedCommandResponse response; - - ClusterWriter writer(true, 0); - - LastError* cmdLastError = &LastError::get(cc()); - - { - // Disable the last error object for the duration of the write - LastError::Disabled disableLastError(cmdLastError); + // Populate the lastError object based on the write response + cmdLastError->reset(); + batchErrorToLastError(request, response, cmdLastError); - // TODO: if we do namespace parsing, push this to the type - if (!request.parseBSON(cmdObj, &errmsg) || !request.isValid(&errmsg)) { - // Batch parse failure - response.setOk(false); - response.setErrCode(ErrorCodes::FailedToParse); - response.setErrMessage(errmsg); - } - else { - // Fixup the namespace to be a full ns internally - const NamespaceString nss(dbname, request.getNS()); - request.setNSS(nss); + size_t numAttempts; - writer.write(request, &response); - } - - dassert(response.isValid(NULL)); - } - - // Populate the lastError object based on the write response - cmdLastError->reset(); - batchErrorToLastError(request, response, cmdLastError); - - size_t numAttempts; - - if (!response.getOk()) { - numAttempts = 0; - } - else if (request.getOrdered() && response.isErrDetailsSet()) { - // Add one failed attempt - numAttempts = response.getErrDetailsAt(0)->getIndex() + 1; - } - else { - numAttempts = request.sizeWriteOps(); - } + if (!response.getOk()) { + numAttempts = 0; + } else if (request.getOrdered() && response.isErrDetailsSet()) { + // Add one failed attempt + numAttempts = response.getErrDetailsAt(0)->getIndex() + 1; + } else { + numAttempts = request.sizeWriteOps(); + } - // TODO: increase opcounters by more than one - if (_writeType == BatchedCommandRequest::BatchType_Insert) { - for (size_t i = 0; i < numAttempts; ++i) { - globalOpCounters.gotInsert(); - } + // TODO: increase opcounters by more than one + if (_writeType == BatchedCommandRequest::BatchType_Insert) { + for (size_t i = 0; i < numAttempts; ++i) { + globalOpCounters.gotInsert(); } - else if (_writeType == BatchedCommandRequest::BatchType_Update) { - for (size_t i = 0; i < numAttempts; ++i) { - globalOpCounters.gotUpdate(); - } + } else if (_writeType == BatchedCommandRequest::BatchType_Update) { + for (size_t i = 0; i < numAttempts; ++i) { + globalOpCounters.gotUpdate(); } - else if (_writeType == BatchedCommandRequest::BatchType_Delete) { - for (size_t i = 0; i < numAttempts; ++i) { - globalOpCounters.gotDelete(); - } - } - - // Save the last opTimes written on each shard for this client, to allow GLE to work - if (haveClient() && writer.getStats().hasShardStats()) { - ClusterLastErrorInfo::get(cc()).addHostOpTimes( - writer.getStats().getShardStats().getWriteOpTimes()); + } else if (_writeType == BatchedCommandRequest::BatchType_Delete) { + for (size_t i = 0; i < numAttempts; ++i) { + globalOpCounters.gotDelete(); } - - // TODO - // There's a pending issue about how to report response here. If we use - // the command infra-structure, we should reuse the 'errmsg' field. But - // we have already filed that message inside the BatchCommandResponse. - // return response.getOk(); - result.appendElements(response.toBSON()); - return true; } - protected: - /** - * Instantiates a command that can be invoked by "name", which will be capable of issuing - * write batches of type "writeType", and will require privilege "action" to run. - */ - ClusterWriteCmd(StringData name, BatchedCommandRequest::BatchType writeType) - : Command(name), - _writeType(writeType) { - + // Save the last opTimes written on each shard for this client, to allow GLE to work + if (haveClient() && writer.getStats().hasShardStats()) { + ClusterLastErrorInfo::get(cc()) + .addHostOpTimes(writer.getStats().getShardStats().getWriteOpTimes()); } - private: - // Type of batch (e.g. insert, update). - const BatchedCommandRequest::BatchType _writeType; - }; + // TODO + // There's a pending issue about how to report response here. If we use + // the command infra-structure, we should reuse the 'errmsg' field. But + // we have already filed that message inside the BatchCommandResponse. + // return response.getOk(); + result.appendElements(response.toBSON()); + return true; + } +protected: + /** + * Instantiates a command that can be invoked by "name", which will be capable of issuing + * write batches of type "writeType", and will require privilege "action" to run. + */ + ClusterWriteCmd(StringData name, BatchedCommandRequest::BatchType writeType) + : Command(name), _writeType(writeType) {} - class ClusterCmdInsert : public ClusterWriteCmd { - public: - ClusterCmdInsert() : ClusterWriteCmd("insert", BatchedCommandRequest::BatchType_Insert) { +private: + // Type of batch (e.g. insert, update). + const BatchedCommandRequest::BatchType _writeType; +}; - } - void help(stringstream& help) const { - help << "insert documents"; - } +class ClusterCmdInsert : public ClusterWriteCmd { +public: + ClusterCmdInsert() : ClusterWriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {} - } clusterInsertCmd; + void help(stringstream& help) const { + help << "insert documents"; + } - class ClusterCmdUpdate : public ClusterWriteCmd { - public: - ClusterCmdUpdate() : ClusterWriteCmd("update", BatchedCommandRequest::BatchType_Update) { +} clusterInsertCmd; - } +class ClusterCmdUpdate : public ClusterWriteCmd { +public: + ClusterCmdUpdate() : ClusterWriteCmd("update", BatchedCommandRequest::BatchType_Update) {} - void help( stringstream& help ) const { - help << "update documents"; - } + void help(stringstream& help) const { + help << "update documents"; + } - } clusterUpdateCmd; +} clusterUpdateCmd; - class ClusterCmdDelete : public ClusterWriteCmd { - public: - ClusterCmdDelete() : ClusterWriteCmd("delete", BatchedCommandRequest::BatchType_Delete) { +class ClusterCmdDelete : public ClusterWriteCmd { +public: + ClusterCmdDelete() : ClusterWriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {} - } - - void help(stringstream& help) const { - help << "delete documents"; - } + void help(stringstream& help) const { + help << "delete documents"; + } - } clusterDeleteCmd; +} clusterDeleteCmd; -} // namespace -} // namespace mongo +} // namespace +} // namespace mongo |