diff options
author | Alberto Lerner <alerner@10gen.com> | 2013-09-01 14:43:08 -0400 |
---|---|---|
committer | Alberto Lerner <alerner@10gen.com> | 2013-09-12 10:51:48 -0400 |
commit | e82786a9f03cb18e924f72f787e184740576111b (patch) | |
tree | cdcc1b7922a994fd681e2cfb234fe645414822f9 /src | |
parent | 98c273d6a9a0aa62182e46ef5c45ed5549d7ce04 (diff) | |
download | mongo-e82786a9f03cb18e924f72f787e184740576111b.tar.gz |
SERVER-10521 Types for write commands.
Diffstat (limited to 'src')
25 files changed, 3408 insertions, 10 deletions
diff --git a/src/mongo/bson/bson_field.h b/src/mongo/bson/bson_field.h index c8fcfb1fe2f..d2a47509e40 100644 --- a/src/mongo/bson/bson_field.h +++ b/src/mongo/bson/bson_field.h @@ -78,7 +78,7 @@ namespace mongo { : _name(name), _defaultSet(false) {} BSONField(const std::string& name, const T& defaultVal) - : _name(name), _default(defaultVal), _defaultSet(true) {} + : _name(name), _default(defaultVal) , _defaultSet(true) {} BSONFieldValue<T> make(const T& t) const { return BSONFieldValue<T>(_name, t); @@ -93,6 +93,7 @@ namespace mongo { } const T& getDefault() const { + dassert(_defaultSet); return _default; } diff --git a/src/mongo/bson/bsonobjbuilder.h b/src/mongo/bson/bsonobjbuilder.h index c0c3d0ba657..23ea0551b97 100644 --- a/src/mongo/bson/bsonobjbuilder.h +++ b/src/mongo/bson/bsonobjbuilder.h @@ -803,6 +803,11 @@ namespace mongo { return *this; } + BSONArrayBuilder& appendTimestamp(unsigned long long ts) { + _b.appendTimestamp(num(), ts); + return *this; + } + BSONArrayBuilder& append(const StringData& s) { _b.append(num(), s); return *this; diff --git a/src/mongo/db/field_parser-inl.h b/src/mongo/db/field_parser-inl.h index 46222de8a83..2fad7dd0261 100644 --- a/src/mongo/db/field_parser-inl.h +++ b/src/mongo/db/field_parser-inl.h @@ -33,6 +33,71 @@ namespace mongo { using mongoutils::str::stream; + template<typename T> + FieldParser::FieldState FieldParser::extract(BSONObj doc, + const BSONField<T>& field, + T* out, + string* errMsg) + { + BSONElement elem = doc[field.name()]; + if (elem.eoo()) { + if (field.hasDefault()) { + field.getDefault().cloneTo(out); + return FIELD_DEFAULT; + } + else { + return FIELD_NONE; + } + } + + if (elem.type() != Object && elem.type() != Array) { + _genFieldErrMsg(doc, field, "Object/Array", errMsg); + return FIELD_INVALID; + } + + if (!out->parseBSON(elem.embeddedObject(), errMsg)) { + return FIELD_INVALID; + } + + return FIELD_SET; + } + + template<typename T> + FieldParser::FieldState FieldParser::extract(BSONObj doc, + const BSONField<T>& field, + T** out, + string* errMsg) + { + BSONElement elem = doc[field.name()]; + if (elem.eoo()) { + if (field.hasDefault()) { + *out = new T; + field.getDefault().cloneTo(*out); + return FIELD_DEFAULT; + } + else { + return FIELD_NONE; + } + } + + if (elem.type() != Object && elem.type() != Array) { + if (errMsg) { + *errMsg = stream() << "wrong type for '" << field() << "' field, expected " + << "vector or array" << ", found " + << doc[field.name()].toString(); + } + return FIELD_INVALID; + } + + auto_ptr<T> temp(new T); + if (!temp->parseBSON(elem.embeddedObject(), errMsg)) { + return FIELD_INVALID; + } + + *out = temp.release(); + return FIELD_SET; + } + // Extracts an array into a vector template<typename T> FieldParser::FieldState FieldParser::extract(BSONObj doc, @@ -89,6 +154,109 @@ namespace mongo { return FIELD_INVALID; } + template<typename T> + FieldParser::FieldState FieldParser::extract(BSONObj doc, + const BSONField<vector<T*> >& field, + vector<T*>* out, + string* errMsg) { + dassert(!field.hasDefault()); + + BSONElement elem = doc[field.name()]; + if (elem.eoo()) { + return FIELD_NONE; + } + + if (elem.type() != Array) { + if (errMsg) { + *errMsg = stream() << "wrong type for '" << field() << "' field, expected " + << "vector array" << ", found " << doc[field.name()].toString(); + } + return FIELD_INVALID; + } + + BSONArray arr = BSONArray(elem.embeddedObject()); + BSONObjIterator objIt(arr); + while (objIt.more()) { + + BSONElement next = objIt.next(); + + if (next.type() != Object) { + if (errMsg) { + *errMsg = stream() << "wrong type for '" << field() << "' field contents, " + << "expected object, found " << elem.type(); + } + return FIELD_INVALID; + } + + auto_ptr<T> toInsert(new T); + + if (!toInsert->parseBSON(next.embeddedObject(), errMsg)) { + return FIELD_INVALID; + } + + out->push_back(toInsert.release()); + } + + return FIELD_SET; + } + + template<typename T> + void FieldParser::clearOwnedVector(vector<T*>* vec) { + for (typename vector<T*>::iterator it = vec->begin(); it != vec->end(); ++it) { + delete (*it); + } + } + + template<typename T> + FieldParser::FieldState FieldParser::extract(BSONObj doc, + const BSONField<vector<T*> >& field, + vector<T*>** out, + string* errMsg) { + dassert(!field.hasDefault()); + + BSONElement elem = doc[field.name()]; + if (elem.eoo()) { + return FIELD_NONE; + } + + if (elem.type() != Array) { + if (errMsg) { + *errMsg = stream() << "wrong type for '" << field() << "' field, expected " + << "vector array" << ", found " << doc[field.name()].toString(); + } + return FIELD_INVALID; + } + + auto_ptr<vector<T*> > tempVector(new vector<T*>); + + BSONArray arr = BSONArray(elem.embeddedObject()); + BSONObjIterator objIt(arr); + while (objIt.more()) { + + BSONElement next = objIt.next(); + + if (next.type() != Object) { + if (errMsg) { + *errMsg = stream() << "wrong type for '" << field() << "' field contents, " + << "expected object, found " << elem.type(); + } + clearOwnedVector(tempVector.get()); + return FIELD_INVALID; + } + + auto_ptr<T> toInsert(new T); + if (!toInsert->parseBSON(next.embeddedObject(), errMsg)) { + clearOwnedVector(tempVector.get()); + return FIELD_INVALID; + } + + tempVector->push_back(toInsert.release()); + } + + *out = tempVector.release(); + return FIELD_SET; + } + // Extracts an object into a map template<typename K, typename T> FieldParser::FieldState FieldParser::extract(BSONObj doc, diff --git a/src/mongo/db/field_parser.cpp b/src/mongo/db/field_parser.cpp index 7b7d7521966..7c91c2004d3 100644 --- a/src/mongo/db/field_parser.cpp +++ b/src/mongo/db/field_parser.cpp @@ -220,9 +220,9 @@ namespace mongo { } FieldParser::FieldState FieldParser::extractNumber(BSONObj doc, - const BSONField<int>& field, - int* out, - string* errMsg) + const BSONField<int>& field, + int* out, + string* errMsg) { BSONElement elem = doc[field.name()]; if (elem.eoo()) { diff --git a/src/mongo/db/field_parser.h b/src/mongo/db/field_parser.h index 498c530a6d7..cbf27544ce8 100644 --- a/src/mongo/db/field_parser.h +++ b/src/mongo/db/field_parser.h @@ -32,6 +32,7 @@ #include "mongo/bson/bson_field.h" #include "mongo/db/jsobj.h" +#include "mongo/s/bson_serializable.h" #include "mongo/util/time_support.h" namespace mongo { @@ -121,6 +122,68 @@ namespace mongo { string* errMsg = NULL); /** + * Extracts a mandatory BSONSerializable structure 'field' from the object 'doc'. Write + * the extracted contents to '*out' if successful or fills '*errMsg', if exising, + * otherwise. This variant relies on T having a parseBSON, which all + * BSONSerializable's have. + * + * TODO: Tighten for BSONSerializable's only + */ + template<typename T> + static FieldState extract(BSONObj doc, + const BSONField<T>& field, + T* out, + string* errMsg = NULL); + + /** + * Similar to the mandatory 'extract' but on a optional field. '*out' would only be + * allocated if the field is present. The ownership of '*out' would be transferred to + * the caller, in that case. + * + * TODO: Tighten for BSONSerializable's only + */ + template<typename T> + static FieldState extract(BSONObj doc, + const BSONField<T>& field, + T** out, // alloc variation + string* errMsg = NULL); + + /** + * Extracts a mandatory repetition of BSONSerializable structures, 'field', from the + * object 'doc'. Write the extracted contents to '*out' if successful or fills + * '*errMsg', if exising, otherwise. This variant relies on T having a parseBSON, + * which all BSONSerializable's have. + * + * The vector owns the instances of T. + * + * TODO: Tighten for BSONSerializable's only + */ + template<typename T> + static FieldState extract(BSONObj doc, + const BSONField<vector<T*> >& field, + vector<T*>* out, + string* errMsg = NULL); + + /** + * Similar to the mandatory repetition' extract but on an optional field. '*out' would + * only be allocated if the field is present. The ownership of '*out' would be + * transferred to the caller, in that case. + * + * The vector owns the instances of T. + * + * TODO: Tighten for BSONSerializable's only + */ + template<typename T> + static FieldState extract(BSONObj doc, + const BSONField<vector<T*> >& field, + vector<T*>** out, + string* errMsg = NULL); + + // + // ==================== Below DEPRECATED; use types instead ==================== + // + + /** * The following extract methods are templatized to handle extraction of vectors and * maps of sub-objects. Keys in the map should be StringData compatible. * @@ -140,6 +203,10 @@ namespace mongo { const BSONField<map<K, T> >& field, map<K, T>* out, string* errMsg = NULL); + + private: + template<typename T> + static void clearOwnedVector(vector<T*>* vec); }; } // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index f8570035b04..166a31fdc9e 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -131,3 +131,68 @@ env.CppUnitTest('metadata_loader_test', '$BUILD_DIR/mongo/db/common'], NO_CRUTCH=True); +# +# Write Operations +# + +env.StaticLibrary( + target='write_ops', + source=[ + 'batched_command_response.cpp', + 'batched_delete_request.cpp', + 'batched_delete_document.cpp', + 'batched_error_detail.cpp', + 'batched_insert_request.cpp', + 'batched_update_request.cpp', + 'batched_update_document.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/mongo/bson', + ], +) + +env.CppUnitTest( + target='batched_command_response_test', + source=[ + 'batched_command_response_test.cpp', + ], + LIBDEPS=[ + 'write_ops', + '$BUILD_DIR/mongo/db/common', + ] +) + +env.CppUnitTest( + target='batched_delete_request_test', + source=[ + 'batched_delete_request_test.cpp', + ], + LIBDEPS=[ + 'write_ops', + '$BUILD_DIR/mongo/db/common', + ] +) + +env.CppUnitTest( + target='batched_insert_request_test', + source=[ + 'batched_insert_request_test.cpp', + ], + LIBDEPS=[ + 'write_ops', + '$BUILD_DIR/mongo/db/common', + ] +) + +env.CppUnitTest( + target='batched_update_request_test', + source=[ + 'batched_update_request_test.cpp', + ], + LIBDEPS=[ + 'write_ops', + '$BUILD_DIR/mongo/db/common', + ] +) + diff --git a/src/mongo/s/batched_command_response.cpp b/src/mongo/s/batched_command_response.cpp new file mode 100644 index 00000000000..e622c608216 --- /dev/null +++ b/src/mongo/s/batched_command_response.cpp @@ -0,0 +1,383 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_command_response.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + const BSONField<bool> BatchedCommandResponse::ok("ok"); + const BSONField<int> BatchedCommandResponse::errCode("errCode"); + const BSONField<BSONObj> BatchedCommandResponse::errInfo("errInfo"); + const BSONField<string> BatchedCommandResponse::errMessage("errMessage"); + const BSONField<long long> BatchedCommandResponse::n("n"); + const BSONField<long long> BatchedCommandResponse::upserted("upserted"); + const BSONField<Date_t> BatchedCommandResponse::lastOp("lastOP"); + const BSONField<std::vector<BatchedErrorDetail*> > BatchedCommandResponse::errDetails("errDetails"); + + BatchedCommandResponse::BatchedCommandResponse() { + clear(); + } + + BatchedCommandResponse::~BatchedCommandResponse() { + unsetErrDetails(); + } + + bool BatchedCommandResponse::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isOkSet) { + *errMsg = stream() << "missing " << ok.name() << " field"; + return false; + } + + if (!_isErrCodeSet) { + *errMsg = stream() << "missing " << errCode.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedCommandResponse::toBSON() const { + BSONObjBuilder builder; + + if (_isOkSet) builder.append(ok(), _ok); + + if (_isErrCodeSet) builder.append(errCode(), _errCode); + + if (_isErrInfoSet) builder.append(errInfo(), _errInfo); + + if (_isErrMessageSet) builder.append(errMessage(), _errMessage); + + if (_isNSet) builder.append(n(), _n); + + if (_isUpsertedSet) builder.append(upserted(), _upserted); + + if (_isLastOpSet) builder.append(lastOp(), _lastOp); + + if (_errDetails.get()) { + BSONArrayBuilder errDetailsBuilder(builder.subarrayStart(errDetails())); + for (std::vector<BatchedErrorDetail*>::const_iterator it = _errDetails->begin(); + it != _errDetails->end(); + ++it) { + BSONObj errDetailsDocument = (*it)->toBSON(); + errDetailsBuilder.append(errDetailsDocument); + } + errDetailsBuilder.done(); + } + + return builder.obj(); + } + + bool BatchedCommandResponse::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, ok, &_ok, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isOkSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, errCode, &_errCode, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isErrCodeSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, errInfo, &_errInfo, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isErrInfoSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, errMessage, &_errMessage, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isErrMessageSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, n, &_n, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isNSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, upserted, &_upserted, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isUpsertedSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, lastOp, &_lastOp, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isLastOpSet = fieldState == FieldParser::FIELD_SET; + + std::vector<BatchedErrorDetail*>* tempErrDetails = NULL; + fieldState = FieldParser::extract(source, errDetails, &tempErrDetails, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + if (fieldState == FieldParser::FIELD_SET) _errDetails.reset(tempErrDetails); + + return true; + } + + void BatchedCommandResponse::clear() { + _ok = false; + _isOkSet = false; + + _errCode = 0; + _isErrCodeSet = false; + + _errInfo = BSONObj(); + _isErrInfoSet = false; + + _errMessage.clear(); + _isErrMessageSet = false; + + _n = 0; + _isNSet = false; + + _upserted = 0; + _isUpsertedSet = false; + + _lastOp = 0ULL; + _isLastOpSet = false; + + if (_errDetails.get()) { + for(std::vector<BatchedErrorDetail*>::const_iterator it = _errDetails->begin(); + it != _errDetails->end(); + ++it) { + delete *it; + }; + _errDetails.reset(); + } + } + + void BatchedCommandResponse::cloneTo(BatchedCommandResponse* other) const { + other->clear(); + + other->_ok = _ok; + other->_isOkSet = _isOkSet; + + other->_errCode = _errCode; + other->_isErrCodeSet = _isErrCodeSet; + + other->_errInfo = _errInfo; + other->_isErrInfoSet = _isErrInfoSet; + + other->_errMessage = _errMessage; + other->_isErrMessageSet = _isErrMessageSet; + + other->_n = _n; + other->_isNSet = _isNSet; + + other->_upserted = _upserted; + other->_isUpsertedSet = _isUpsertedSet; + + other->_lastOp = _lastOp; + other->_isLastOpSet = _isLastOpSet; + + other->unsetErrDetails(); + if (_errDetails.get()) { + for(std::vector<BatchedErrorDetail*>::const_iterator it = _errDetails->begin(); + it != _errDetails->end(); + ++it) { + BatchedErrorDetail* errDetailsItem = new BatchedErrorDetail; + (*it)->cloneTo(errDetailsItem); + other->addToErrDetails(errDetailsItem); + } + } + } + + std::string BatchedCommandResponse::toString() const { + return "implement me"; + } + + void BatchedCommandResponse::setOk(bool ok) { + _ok = ok; + _isOkSet = true; + } + + void BatchedCommandResponse::unsetOk() { + _isOkSet = false; + } + + bool BatchedCommandResponse::isOkSet() const { + return _isOkSet; + } + + bool BatchedCommandResponse::getOk() const { + dassert(_isOkSet); + return _ok; + } + + void BatchedCommandResponse::setErrCode(int errCode) { + _errCode = errCode; + _isErrCodeSet = true; + } + + void BatchedCommandResponse::unsetErrCode() { + _isErrCodeSet = false; + } + + bool BatchedCommandResponse::isErrCodeSet() const { + return _isErrCodeSet; + } + + int BatchedCommandResponse::getErrCode() const { + dassert(_isErrCodeSet); + return _errCode; + } + + void BatchedCommandResponse::setErrInfo(const BSONObj& errInfo) { + _errInfo = errInfo.getOwned(); + _isErrInfoSet = true; + } + + void BatchedCommandResponse::unsetErrInfo() { + _isErrInfoSet = false; + } + + bool BatchedCommandResponse::isErrInfoSet() const { + return _isErrInfoSet; + } + + const BSONObj& BatchedCommandResponse::getErrInfo() const { + dassert(_isErrInfoSet); + return _errInfo; + } + + void BatchedCommandResponse::setErrMessage(const StringData& errMessage) { + _errMessage = errMessage.toString(); + _isErrMessageSet = true; + } + + void BatchedCommandResponse::unsetErrMessage() { + _isErrMessageSet = false; + } + + bool BatchedCommandResponse::isErrMessageSet() const { + return _isErrMessageSet; + } + + const std::string& BatchedCommandResponse::getErrMessage() const { + dassert(_isErrMessageSet); + return _errMessage; + } + + void BatchedCommandResponse::setN(long long n) { + _n = n; + _isNSet = true; + } + + void BatchedCommandResponse::unsetN() { + _isNSet = false; + } + + bool BatchedCommandResponse::isNSet() const { + return _isNSet; + } + + long long BatchedCommandResponse::getN() const { + dassert(_isNSet); + return _n; + } + + void BatchedCommandResponse::setUpserted(long long upserted) { + _upserted = upserted; + _isUpsertedSet = true; + } + + void BatchedCommandResponse::unsetUpserted() { + _isUpsertedSet = false; + } + + bool BatchedCommandResponse::isUpsertedSet() const { + return _isUpsertedSet; + } + + long long BatchedCommandResponse::getUpserted() const { + dassert(_isUpsertedSet); + return _upserted; + } + + void BatchedCommandResponse::setLastOp(Date_t lastOp) { + _lastOp = lastOp; + _isLastOpSet = true; + } + + void BatchedCommandResponse::unsetLastOp() { + _isLastOpSet = false; + } + + bool BatchedCommandResponse::isLastOpSet() const { + return _isLastOpSet; + } + + Date_t BatchedCommandResponse::getLastOp() const { + dassert(_isLastOpSet); + return _lastOp; + } + + void BatchedCommandResponse::setErrDetails(const std::vector<BatchedErrorDetail*>& errDetails) { + unsetErrDetails(); + for (std::vector<BatchedErrorDetail*>::const_iterator it = errDetails.begin(); + it != errDetails.end(); + ++it) { + auto_ptr<BatchedErrorDetail> tempBatchErrorDetail(new BatchedErrorDetail); + (*it)->cloneTo(tempBatchErrorDetail.get()); + addToErrDetails(tempBatchErrorDetail.release()); + } + } + + void BatchedCommandResponse::addToErrDetails(BatchedErrorDetail* errDetails) { + if (_errDetails.get() == NULL) { + _errDetails.reset(new std::vector<BatchedErrorDetail*>); + } + _errDetails->push_back(errDetails); + } + + void BatchedCommandResponse::unsetErrDetails() { + if (_errDetails.get() != NULL) { + for(std::vector<BatchedErrorDetail*>::iterator it = _errDetails->begin(); + it != _errDetails->end(); + ++it) { + delete *it; + } + _errDetails.reset(); + } + } + + bool BatchedCommandResponse::isErrDetailsSet() const { + return _errDetails.get() != NULL; + } + + size_t BatchedCommandResponse::sizeErrDetails() const { + dassert(_errDetails.get()); + return _errDetails->size(); + } + + const std::vector<BatchedErrorDetail*>& BatchedCommandResponse::getErrDetails() const { + dassert(_errDetails.get()); + return *_errDetails; + } + + const BatchedErrorDetail* BatchedCommandResponse::getErrDetailsAt(size_t pos) const { + dassert(_errDetails.get()); + dassert(_errDetails->size() > pos); + return _errDetails->at(pos); + } + + } // namespace mongo diff --git a/src/mongo/s/batched_command_response.h b/src/mongo/s/batched_command_response.h new file mode 100644 index 00000000000..185c5f440be --- /dev/null +++ b/src/mongo/s/batched_command_response.h @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <boost/scoped_ptr.hpp> +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/batched_error_detail.h" +#include "mongo/s/bson_serializable.h" + +namespace mongo { + + /** + * This class represents the layout and content of a insert/update/delete runCommand, + * the response side. + */ + class BatchedCommandResponse : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedCommandResponse); + public: + + // + // schema declarations + // + + static const BSONField<bool> ok; + static const BSONField<int> errCode; + static const BSONField<BSONObj> errInfo; + static const BSONField<string> errMessage; + static const BSONField<long long> n; + static const BSONField<long long> upserted; + static const BSONField<Date_t> lastOp; + static const BSONField<std::vector<BatchedErrorDetail*> > errDetails; + + // + // construction / destruction + // + + BatchedCommandResponse(); + virtual ~BatchedCommandResponse(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedCommandResponse* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setOk(bool ok); + void unsetOk(); + bool isOkSet() const; + bool getOk() const; + + void setErrCode(int errCode); + void unsetErrCode(); + bool isErrCodeSet() const; + int getErrCode() const; + + void setErrInfo(const BSONObj& errInfo); + void unsetErrInfo(); + bool isErrInfoSet() const; + const BSONObj& getErrInfo() const; + + void setErrMessage(const StringData& errMessage); + void unsetErrMessage(); + bool isErrMessageSet() const; + const std::string& getErrMessage() const; + + void setN(long long n); + void unsetN(); + bool isNSet() const; + long long getN() const; + + void setUpserted(long long upserted); + void unsetUpserted(); + bool isUpsertedSet() const; + long long getUpserted() const; + + void setLastOp(Date_t lastOp); + void unsetLastOp(); + bool isLastOpSet() const; + Date_t getLastOp() const; + + void setErrDetails(const std::vector<BatchedErrorDetail*>& errDetails); + void addToErrDetails(BatchedErrorDetail* errDetails); + void unsetErrDetails(); + bool isErrDetailsSet() const; + size_t sizeErrDetails() const; + const std::vector<BatchedErrorDetail*>& getErrDetails() const; + const BatchedErrorDetail* getErrDetailsAt(size_t pos) const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) false if batch didn't get to be applied for any reason + bool _ok; + bool _isOkSet; + + // (M) whether all items in the batch applied correctly + int _errCode; + bool _isErrCodeSet; + + // (O) further details about the error + BSONObj _errInfo; + bool _isErrInfoSet; + + // (O) whether all items in the batch applied correctly + string _errMessage; + bool _isErrMessageSet; + + // (O) number of documents affected + long long _n; + bool _isNSet; + + // (O) in updates, number of ops that were upserts + long long _upserted; + bool _isUpsertedSet; + + // (O) XXX What is lastop? + Date_t _lastOp; + bool _isLastOpSet; + + // (O) Array of item-level error information + boost::scoped_ptr<std::vector<BatchedErrorDetail*> >_errDetails; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_command_response_test.cpp b/src/mongo/s/batched_command_response_test.cpp new file mode 100644 index 00000000000..1b85a80b039 --- /dev/null +++ b/src/mongo/s/batched_command_response_test.cpp @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_command_response.h" + +#include <string> + +#include "mongo/db/jsobj.h" +#include "mongo/s/batched_error_detail.h" +#include "mongo/unittest/unittest.h" + +namespace { + + using mongo::BSONArray; + using mongo::BSONObj; + using mongo::BatchedCommandResponse; + using mongo::BatchedErrorDetail; + using mongo::Date_t; + using std::string; + + TEST(RoundTrip, Normal) { + + BSONArray errDetailsArray = + BSON_ARRAY( + BSON(BatchedErrorDetail::index(0) << + BatchedErrorDetail::errCode(-2) << + BatchedErrorDetail::errInfo(BSON("more info" << 1)) << + BatchedErrorDetail::errMessage("index 0 failed") + ) << + BSON(BatchedErrorDetail::index(1) << + BatchedErrorDetail::errCode(-3) << + BatchedErrorDetail::errInfo(BSON("more info" << 1)) << + BatchedErrorDetail::errMessage("index 1 failed too") + ) + ); + + BSONObj origResponseObj = + BSON(BatchedCommandResponse::ok(false) << + BatchedCommandResponse::errCode(-1) << + BatchedCommandResponse::errInfo(BSON("moreInfo" << 1)) << + BatchedCommandResponse::errMessage("this batch didn't work") << + BatchedCommandResponse::n(0) << + BatchedCommandResponse::upserted(false) << + BatchedCommandResponse::lastOp(Date_t(1)) << + BatchedCommandResponse::errDetails() << errDetailsArray); + + string errMsg; + BatchedCommandResponse response; + bool ok = response.parseBSON(origResponseObj, &errMsg); + ASSERT_TRUE(ok); + + BSONObj genResponseObj = response.toBSON(); + ASSERT_EQUALS(0, genResponseObj.woCompare(origResponseObj)); + } + +} // unnamed namespace diff --git a/src/mongo/s/batched_delete_document.cpp b/src/mongo/s/batched_delete_document.cpp new file mode 100644 index 00000000000..0558af5bbca --- /dev/null +++ b/src/mongo/s/batched_delete_document.cpp @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_delete_document.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + const BSONField<BSONObj> BatchedDeleteDocument::query("q"); + const BSONField<int> BatchedDeleteDocument::limit("limit", 1); + + BatchedDeleteDocument::BatchedDeleteDocument() { + clear(); + } + + BatchedDeleteDocument::~BatchedDeleteDocument() { + } + + bool BatchedDeleteDocument::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isQuerySet) { + *errMsg = stream() << "missing " << query.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedDeleteDocument::toBSON() const { + BSONObjBuilder builder; + + if (_isQuerySet) builder.append(query(), _query); + + if (_isLimitSet) builder.append(limit(), _limit); + + return builder.obj(); + } + + bool BatchedDeleteDocument::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, query, &_query, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isQuerySet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, limit, &_limit, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isLimitSet = fieldState == FieldParser::FIELD_SET; + + return true; + } + + void BatchedDeleteDocument::clear() { + _query = BSONObj(); + _isQuerySet = false; + + _limit = 0; + _isLimitSet = false; + + } + + void BatchedDeleteDocument::cloneTo(BatchedDeleteDocument* other) const { + other->clear(); + + other->_query = _query; + other->_isQuerySet = _isQuerySet; + + other->_limit = _limit; + other->_isLimitSet = _isLimitSet; + } + + std::string BatchedDeleteDocument::toString() const { + return toBSON().toString(); + } + + void BatchedDeleteDocument::setQuery(const BSONObj& query) { + _query = query.getOwned(); + _isQuerySet = true; + } + + void BatchedDeleteDocument::unsetQuery() { + _isQuerySet = false; + } + + bool BatchedDeleteDocument::isQuerySet() const { + return _isQuerySet; + } + + const BSONObj& BatchedDeleteDocument::getQuery() const { + dassert(_isQuerySet); + return _query; + } + + void BatchedDeleteDocument::setLimit(int limit) { + _limit = limit; + _isLimitSet = true; + } + + void BatchedDeleteDocument::unsetLimit() { + _isLimitSet = false; + } + + bool BatchedDeleteDocument::isLimitSet() const { + return _isLimitSet; + } + + int BatchedDeleteDocument::getLimit() const { + dassert(_isLimitSet); + return _limit; + } + + } // namespace mongo diff --git a/src/mongo/s/batched_delete_document.h b/src/mongo/s/batched_delete_document.h new file mode 100644 index 00000000000..46e1f6a6da5 --- /dev/null +++ b/src/mongo/s/batched_delete_document.h @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/bson_serializable.h" + +namespace mongo { + + /** + * This class represents the layout and content of a delete document runCommand, + * in the resquest side. + */ + class BatchedDeleteDocument : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedDeleteDocument); + public: + + // + // schema declarations + // + + static const BSONField<BSONObj> query; + static const BSONField<int> limit; + + // + // construction / destruction + // + + BatchedDeleteDocument(); + virtual ~BatchedDeleteDocument(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedDeleteDocument* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setQuery(const BSONObj& query); + void unsetQuery(); + bool isQuerySet() const; + const BSONObj& getQuery() const; + + void setLimit(int limit); + void unsetLimit(); + bool isLimitSet() const; + int getLimit() const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) query whose result the delete will remove + BSONObj _query; + bool _isQuerySet; + + // (O) cap the number of documents to be deleted + int _limit; + bool _isLimitSet; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_delete_request.cpp b/src/mongo/s/batched_delete_request.cpp new file mode 100644 index 00000000000..bf90abd7238 --- /dev/null +++ b/src/mongo/s/batched_delete_request.cpp @@ -0,0 +1,323 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_delete_request.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + + const std::string BatchedDeleteRequest::BATCHED_DELETE_REQUEST = "delete"; + const BSONField<std::string> BatchedDeleteRequest::collName("insert"); + const BSONField<std::vector<BatchedDeleteDocument*> > BatchedDeleteRequest::deletes("deletes"); + const BSONField<BSONObj> BatchedDeleteRequest::writeConcern("writeConcern"); + const BSONField<bool> BatchedDeleteRequest::continueOnError("continueOnError", false); + const BSONField<ChunkVersion> BatchedDeleteRequest::shardVersion("shardVersion"); + const BSONField<long long> BatchedDeleteRequest::session("session"); + + BatchedDeleteRequest::BatchedDeleteRequest() { + clear(); + } + + BatchedDeleteRequest::~BatchedDeleteRequest() { + unsetDeletes(); + } + + bool BatchedDeleteRequest::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isCollNameSet) { + *errMsg = stream() << "missing " << collName.name() << " field"; + return false; + } + + if (!_isDeletesSet) { + *errMsg = stream() << "missing " << deletes.name() << " field"; + return false; + } + + if (!_isWriteConcernSet) { + *errMsg = stream() << "missing " << writeConcern.name() << " field"; + return false; + } + + if (!_isContinueOnErrorSet) { + *errMsg = stream() << "missing " << continueOnError.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedDeleteRequest::toBSON() const { + BSONObjBuilder builder; + + if (_isCollNameSet) builder.append(collName(), _collName); + + if (_isDeletesSet) { + BSONArrayBuilder deletesBuilder(builder.subarrayStart(deletes())); + for (std::vector<BatchedDeleteDocument*>::const_iterator it = _deletes.begin(); + it != _deletes.end(); + ++it) { + BSONObj deleteDocument = (*it)->toBSON(); + deletesBuilder.append(deleteDocument); + } + deletesBuilder.done(); + } + + if (_isWriteConcernSet) builder.append(writeConcern(), _writeConcern); + + if (_isContinueOnErrorSet) builder.append(continueOnError(), _continueOnError); + + if (_shardVersion.get()) { + // ChunkVersion wants to be an array. + builder.append(shardVersion(), static_cast<BSONArray>(_shardVersion->toBSON())); + } + + if (_isSessionSet) builder.append(session(), _session); + + return builder.obj(); + } + + bool BatchedDeleteRequest::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, collName, &_collName, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isCollNameSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, deletes, &_deletes, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isDeletesSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, writeConcern, &_writeConcern, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isWriteConcernSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, continueOnError, &_continueOnError, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isContinueOnErrorSet = fieldState == FieldParser::FIELD_SET; + + ChunkVersion* tempChunkVersion = NULL; + fieldState = FieldParser::extract(source, shardVersion, &tempChunkVersion, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + if (fieldState == FieldParser::FIELD_SET) _shardVersion.reset(tempChunkVersion); + + fieldState = FieldParser::extract(source, session, &_session, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isSessionSet = fieldState == FieldParser::FIELD_SET; + + return true; + } + + void BatchedDeleteRequest::clear() { + _collName.clear(); + _isCollNameSet = false; + + unsetDeletes(); + + _writeConcern = BSONObj(); + _isWriteConcernSet = false; + + _continueOnError = false; + _isContinueOnErrorSet = false; + + _shardVersion.reset(); + + _session = 0; + _isSessionSet = false; + + } + + void BatchedDeleteRequest::cloneTo(BatchedDeleteRequest* other) const { + other->clear(); + + other->_collName = _collName; + other->_isCollNameSet = _isCollNameSet; + + for(std::vector<BatchedDeleteDocument*>::const_iterator it = _deletes.begin(); + it != _deletes.end(); + ++it) { + auto_ptr<BatchedDeleteDocument> tempBatchDeleteDocument(new BatchedDeleteDocument); + (*it)->cloneTo(tempBatchDeleteDocument.get()); + other->addToDeletes(*it); + } + other->_isDeletesSet = _isDeletesSet; + + other->_writeConcern = _writeConcern; + other->_isWriteConcernSet = _isWriteConcernSet; + + other->_continueOnError = _continueOnError; + other->_isContinueOnErrorSet = _isContinueOnErrorSet; + + if (other->_shardVersion.get()) _shardVersion->cloneTo(other->_shardVersion.get()); + + other->_session = _session; + other->_isSessionSet = _isSessionSet; + } + + std::string BatchedDeleteRequest::toString() const { + return toBSON().toString(); + } + + void BatchedDeleteRequest::setCollName(const StringData& collName) { + _collName = collName.toString(); + _isCollNameSet = true; + } + + void BatchedDeleteRequest::unsetCollName() { + _isCollNameSet = false; + } + + bool BatchedDeleteRequest::isCollNameSet() const { + return _isCollNameSet; + } + + const std::string& BatchedDeleteRequest::getCollName() const { + dassert(_isCollNameSet); + return _collName; + } + + void BatchedDeleteRequest::setDeletes(const std::vector<BatchedDeleteDocument*>& deletes) { + for (std::vector<BatchedDeleteDocument*>::const_iterator it = deletes.begin(); + it != deletes.end(); + ++it) { + auto_ptr<BatchedDeleteDocument> tempBatchDeleteDocument(new BatchedDeleteDocument); + (*it)->cloneTo(tempBatchDeleteDocument.get()); + addToDeletes(tempBatchDeleteDocument.release()); + } + _isDeletesSet = deletes.size() > 0; + } + + void BatchedDeleteRequest::addToDeletes(BatchedDeleteDocument* deletes) { + _deletes.push_back(deletes); + _isDeletesSet = true; + } + + void BatchedDeleteRequest::unsetDeletes() { + for(std::vector<BatchedDeleteDocument*>::iterator it = _deletes.begin(); + it != _deletes.end(); + ++it) { + delete *it; + } + _deletes.clear(); + _isDeletesSet = false; + } + + bool BatchedDeleteRequest::isDeletesSet() const { + return _isDeletesSet; + } + + size_t BatchedDeleteRequest::sizeDeletes() const { + return _deletes.size(); + } + + const std::vector<BatchedDeleteDocument*>& BatchedDeleteRequest::getDeletes() const { + dassert(_isDeletesSet); + return _deletes; + } + + const BatchedDeleteDocument* BatchedDeleteRequest::getDeletesAt(size_t pos) const { + dassert(_isDeletesSet); + dassert(_deletes.size() > pos); + return _deletes.at(pos); + } + + void BatchedDeleteRequest::setWriteConcern(const BSONObj& writeConcern) { + _writeConcern = writeConcern.getOwned(); + _isWriteConcernSet = true; + } + + void BatchedDeleteRequest::unsetWriteConcern() { + _isWriteConcernSet = false; + } + + bool BatchedDeleteRequest::isWriteConcernSet() const { + return _isWriteConcernSet; + } + + const BSONObj& BatchedDeleteRequest::getWriteConcern() const { + dassert(_isWriteConcernSet); + return _writeConcern; + } + + void BatchedDeleteRequest::setContinueOnError(bool continueOnError) { + _continueOnError = continueOnError; + _isContinueOnErrorSet = true; + } + + void BatchedDeleteRequest::unsetContinueOnError() { + _isContinueOnErrorSet = false; + } + + bool BatchedDeleteRequest::isContinueOnErrorSet() const { + return _isContinueOnErrorSet; + } + + bool BatchedDeleteRequest::getContinueOnError() const { + dassert(_isContinueOnErrorSet); + return _continueOnError; + } + + void BatchedDeleteRequest::setShardVersion(const ChunkVersion& shardVersion) { + auto_ptr<ChunkVersion> temp(new ChunkVersion); + shardVersion.cloneTo(temp.get()); + _shardVersion.reset(temp.release()); + } + + void BatchedDeleteRequest::unsetShardVersion() { + _shardVersion.reset(); + } + + bool BatchedDeleteRequest::isShardVersionSet() const { + return _shardVersion.get() != NULL; + } + + const ChunkVersion& BatchedDeleteRequest::getShardVersion() const { + dassert(_shardVersion.get()); + return *_shardVersion; + } + + void BatchedDeleteRequest::setSession(long long session) { + _session = session; + _isSessionSet = true; + } + + void BatchedDeleteRequest::unsetSession() { + _isSessionSet = false; + } + + bool BatchedDeleteRequest::isSessionSet() const { + return _isSessionSet; + } + + long long BatchedDeleteRequest::getSession() const { + dassert(_isSessionSet); + return _session; + } + + } // namespace mongo diff --git a/src/mongo/s/batched_delete_request.h b/src/mongo/s/batched_delete_request.h new file mode 100644 index 00000000000..67f795a6690 --- /dev/null +++ b/src/mongo/s/batched_delete_request.h @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without xbeven the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <boost/scoped_ptr.hpp> +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/batched_delete_document.h" +#include "mongo/s/bson_serializable.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + + /** + * This class represents the layout and content of a batched delete runCommand, + * the request side. + */ + class BatchedDeleteRequest : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedDeleteRequest); + public: + + // + // schema declarations + // + + // Name used for the batched delete invocation. + static const std::string BATCHED_DELETE_REQUEST; + + // Field names and types in the batched delete command type. + static const BSONField<std::string> collName; + static const BSONField<std::vector<BatchedDeleteDocument*> > deletes; + static const BSONField<BSONObj> writeConcern; + static const BSONField<bool> continueOnError; + static const BSONField<ChunkVersion> shardVersion; + static const BSONField<long long> session; + + // + // construction / destruction + // + + BatchedDeleteRequest(); + virtual ~BatchedDeleteRequest(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedDeleteRequest* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setCollName(const StringData& collName); + void unsetCollName(); + bool isCollNameSet() const; + const std::string& getCollName() const; + + void setDeletes(const std::vector<BatchedDeleteDocument*>& deletes); + void addToDeletes(BatchedDeleteDocument* deletes); + void unsetDeletes(); + bool isDeletesSet() const; + size_t sizeDeletes() const; + const std::vector<BatchedDeleteDocument*>& getDeletes() const; + const BatchedDeleteDocument* getDeletesAt(size_t pos) const; + + void setWriteConcern(const BSONObj& writeConcern); + void unsetWriteConcern(); + bool isWriteConcernSet() const; + const BSONObj& getWriteConcern() const; + + void setContinueOnError(bool continueOnError); + void unsetContinueOnError(); + bool isContinueOnErrorSet() const; + bool getContinueOnError() const; + + void setShardVersion(const ChunkVersion& shardVersion); + void unsetShardVersion(); + bool isShardVersionSet() const; + const ChunkVersion& getShardVersion() const; + + void setSession(long long session); + void unsetSession(); + bool isSessionSet() const; + long long getSession() const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) collection we're deleting from + std::string _collName; + bool _isCollNameSet; + + // (M) array of individual deletes + std::vector<BatchedDeleteDocument*> _deletes; + bool _isDeletesSet; + + // (M) to be issued after the batch applied + BSONObj _writeConcern; + bool _isWriteConcernSet; + + // (M) whether batch is issued in parallel or not + bool _continueOnError; + bool _isContinueOnErrorSet; + + // (O) version for this collection on a given shard + boost::scoped_ptr<ChunkVersion> _shardVersion; + + // (O) session number the inserts belong to + long long _session; + bool _isSessionSet; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_delete_request_test.cpp b/src/mongo/s/batched_delete_request_test.cpp new file mode 100644 index 00000000000..c85fa45cbfb --- /dev/null +++ b/src/mongo/s/batched_delete_request_test.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_delete_request.h" + +#include <string> + +#include "mongo/db/jsobj.h" +#include "mongo/s/batched_delete_document.h" +#include "mongo/unittest/unittest.h" + +namespace { + + using mongo::BSONArray; + using mongo::BSONObj; + using mongo::BatchedDeleteRequest; + using mongo::BatchedDeleteDocument; + using mongo::BSONArrayBuilder; + using mongo::OID; + using mongo::OpTime; + using std::string; + + + TEST(RoundTrip, Normal) { + BSONArray deleteArray = + BSON_ARRAY( + BSON(BatchedDeleteDocument::query(BSON("a" << 1)) << + BatchedDeleteDocument::limit(1) + ) << + BSON(BatchedDeleteDocument::query(BSON("b" << 1)) << + BatchedDeleteDocument::limit(1) + ) + ); + + BSONObj writeConcernObj = BSON("w" << 1); + + // The BSON_ARRAY macro doesn't support Timestamps. + BSONArrayBuilder arrBuilder; + arrBuilder.appendTimestamp(OpTime(1,1).asDate()); + arrBuilder.append(OID::gen()); + BSONArray shardVersionArray = arrBuilder.arr(); + + BSONObj origDeleteRequestObj = + BSON(BatchedDeleteRequest::collName("test") << + BatchedDeleteRequest::deletes() << deleteArray << + BatchedDeleteRequest::writeConcern(writeConcernObj) << + BatchedDeleteRequest::continueOnError(false) << + BatchedDeleteRequest::shardVersion() << shardVersionArray << + BatchedDeleteRequest::session(0)); + + string errMsg; + BatchedDeleteRequest request; + bool ok = request.parseBSON(origDeleteRequestObj, &errMsg); + ASSERT_TRUE(ok); + + BSONObj genDeleteRequestObj = request.toBSON(); + ASSERT_EQUALS(0, genDeleteRequestObj.woCompare(origDeleteRequestObj)); + } + +} // unnamed namespace diff --git a/src/mongo/s/batched_error_detail.cpp b/src/mongo/s/batched_error_detail.cpp new file mode 100644 index 00000000000..117f63868c5 --- /dev/null +++ b/src/mongo/s/batched_error_detail.cpp @@ -0,0 +1,204 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_error_detail.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + const BSONField<int> BatchedErrorDetail::index("index"); + const BSONField<int> BatchedErrorDetail::errCode("errCode"); + const BSONField<BSONObj> BatchedErrorDetail::errInfo("errInfo"); + const BSONField<std::string> BatchedErrorDetail::errMessage("errMessage"); + + BatchedErrorDetail::BatchedErrorDetail() { + clear(); + } + + BatchedErrorDetail::~BatchedErrorDetail() { + } + + bool BatchedErrorDetail::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isIndexSet) { + *errMsg = stream() << "missing " << index.name() << " field"; + return false; + } + + if (!_isErrCodeSet) { + *errMsg = stream() << "missing " << errCode.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedErrorDetail::toBSON() const { + BSONObjBuilder builder; + + if (_isIndexSet) builder.append(index(), _index); + + if (_isErrCodeSet) builder.append(errCode(), _errCode); + + if (_isErrInfoSet) builder.append(errInfo(), _errInfo); + + if (_isErrMessageSet) builder.append(errMessage(), _errMessage); + + return builder.obj(); + } + + bool BatchedErrorDetail::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, index, &_index, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isIndexSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, errCode, &_errCode, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isErrCodeSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, errInfo, &_errInfo, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isErrInfoSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, errMessage, &_errMessage, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isErrMessageSet = fieldState == FieldParser::FIELD_SET; + + return true; + } + + void BatchedErrorDetail::clear() { + _index = 0; + _isIndexSet = false; + + _errCode = 0; + _isErrCodeSet = false; + + _errInfo = BSONObj(); + _isErrInfoSet = false; + + _errMessage.clear(); + _isErrMessageSet = false; + + } + + void BatchedErrorDetail::cloneTo(BatchedErrorDetail* other) const { + other->clear(); + + other->_index = _index; + other->_isIndexSet = _isIndexSet; + + other->_errCode = _errCode; + other->_isErrCodeSet = _isErrCodeSet; + + other->_errInfo = _errInfo; + other->_isErrInfoSet = _isErrInfoSet; + + other->_errMessage = _errMessage; + other->_isErrMessageSet = _isErrMessageSet; + } + + std::string BatchedErrorDetail::toString() const { + return "implement me"; + } + + void BatchedErrorDetail::setIndex(int index) { + _index = index; + _isIndexSet = true; + } + + void BatchedErrorDetail::unsetIndex() { + _isIndexSet = false; + } + + bool BatchedErrorDetail::isIndexSet() const { + return _isIndexSet; + } + + int BatchedErrorDetail::getIndex() const { + dassert(_isIndexSet); + return _index; + } + + void BatchedErrorDetail::setErrCode(int errCode) { + _errCode = errCode; + _isErrCodeSet = true; + } + + void BatchedErrorDetail::unsetErrCode() { + _isErrCodeSet = false; + } + + bool BatchedErrorDetail::isErrCodeSet() const { + return _isErrCodeSet; + } + + int BatchedErrorDetail::getErrCode() const { + dassert(_isErrCodeSet); + return _errCode; + } + + void BatchedErrorDetail::setErrInfo(const BSONObj& errInfo) { + _errInfo = errInfo.getOwned(); + _isErrInfoSet = true; + } + + void BatchedErrorDetail::unsetErrInfo() { + _isErrInfoSet = false; + } + + bool BatchedErrorDetail::isErrInfoSet() const { + return _isErrInfoSet; + } + + const BSONObj& BatchedErrorDetail::getErrInfo() const { + dassert(_isErrInfoSet); + return _errInfo; + } + + void BatchedErrorDetail::setErrMessage(const StringData& errMessage) { + _errMessage = errMessage.toString(); + _isErrMessageSet = true; + } + + void BatchedErrorDetail::unsetErrMessage() { + _isErrMessageSet = false; + } + + bool BatchedErrorDetail::isErrMessageSet() const { + return _isErrMessageSet; + } + + const std::string& BatchedErrorDetail::getErrMessage() const { + dassert(_isErrMessageSet); + return _errMessage; + } + + } // namespace mongo diff --git a/src/mongo/s/batched_error_detail.h b/src/mongo/s/batched_error_detail.h new file mode 100644 index 00000000000..203d0c314d1 --- /dev/null +++ b/src/mongo/s/batched_error_detail.h @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/bson_serializable.h" + +namespace mongo { + + /** + * This class represents the layout and content of a insert/update/delete runCommand, + * the response side. + */ + class BatchedErrorDetail : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedErrorDetail); + public: + + // + // schema declarations + // + + static const BSONField<int> index; + static const BSONField<int> errCode; + static const BSONField<BSONObj> errInfo; + static const BSONField<std::string> errMessage; + + // + // construction / destruction + // + + BatchedErrorDetail(); + virtual ~BatchedErrorDetail(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedErrorDetail* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setIndex(int index); + void unsetIndex(); + bool isIndexSet() const; + int getIndex() const; + + void setErrCode(int errCode); + void unsetErrCode(); + bool isErrCodeSet() const; + int getErrCode() const; + + void setErrInfo(const BSONObj& errInfo); + void unsetErrInfo(); + bool isErrInfoSet() const; + const BSONObj& getErrInfo() const; + + void setErrMessage(const StringData& errMessage); + void unsetErrMessage(); + bool isErrMessageSet() const; + const std::string& getErrMessage() const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) number of the batch item the error refers to + int _index; + bool _isIndexSet; + + // (M) whether all items in the batch applied correctly + int _errCode; + bool _isErrCodeSet; + + // (O) further details about the batch item error + BSONObj _errInfo; + bool _isErrInfoSet; + + // (O) user readable explanation about the batch item error + std::string _errMessage; + bool _isErrMessageSet; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_insert_request.cpp b/src/mongo/s/batched_insert_request.cpp new file mode 100644 index 00000000000..1485d6015b5 --- /dev/null +++ b/src/mongo/s/batched_insert_request.cpp @@ -0,0 +1,313 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_insert_request.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + + const std::string BatchedInsertRequest::BATCHED_INSERT_REQUEST = "insert"; + const BSONField<std::string> BatchedInsertRequest::collName("insert"); + const BSONField<std::vector<BSONObj> > BatchedInsertRequest::documents("documents"); + const BSONField<BSONObj> BatchedInsertRequest::writeConcern("writeConcern"); + const BSONField<bool> BatchedInsertRequest::continueOnError("continueOnError", false); + const BSONField<ChunkVersion> BatchedInsertRequest::shardVersion("shardVersion"); + const BSONField<long long> BatchedInsertRequest::session("session"); + + BatchedInsertRequest::BatchedInsertRequest() { + clear(); + } + + BatchedInsertRequest::~BatchedInsertRequest() { + } + + bool BatchedInsertRequest::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isCollNameSet) { + *errMsg = stream() << "missing " << collName.name() << " field"; + return false; + } + + if (!_isDocumentsSet) { + *errMsg = stream() << "missing " << documents.name() << " field"; + return false; + } + + if (!_isWriteConcernSet) { + *errMsg = stream() << "missing " << writeConcern.name() << " field"; + return false; + } + + if (!_isContinueOnErrorSet) { + *errMsg = stream() << "missing " << continueOnError.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedInsertRequest::toBSON() const { + BSONObjBuilder builder; + + if (_isCollNameSet) builder.append(collName(), _collName); + + if (_isDocumentsSet) { + BSONArrayBuilder documentsBuilder(builder.subarrayStart(documents())); + for (std::vector<BSONObj>::const_iterator it = _documents.begin(); + it != _documents.end(); + ++it) { + documentsBuilder.append(*it); + } + documentsBuilder.done(); + } + + if (_isWriteConcernSet) builder.append(writeConcern(), _writeConcern); + + if (_isContinueOnErrorSet) builder.append(continueOnError(), _continueOnError); + + if (_shardVersion.get()) { + // ChunkVersion wants to be an array. + builder.append(shardVersion(), static_cast<BSONArray>(_shardVersion->toBSON())); + } + + if (_isSessionSet) builder.append(session(), _session); + + return builder.obj(); + } + + bool BatchedInsertRequest::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, collName, &_collName, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isCollNameSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, documents, &_documents, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isDocumentsSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, writeConcern, &_writeConcern, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isWriteConcernSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, continueOnError, &_continueOnError, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isContinueOnErrorSet = fieldState == FieldParser::FIELD_SET; + + ChunkVersion* tempChunkVersion = NULL; + fieldState = FieldParser::extract(source, shardVersion, &tempChunkVersion, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + if (fieldState == FieldParser::FIELD_SET) _shardVersion.reset(tempChunkVersion); + + fieldState = FieldParser::extract(source, session, &_session, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isSessionSet = fieldState == FieldParser::FIELD_SET; + + return true; + } + + void BatchedInsertRequest::clear() { + _collName.clear(); + _isCollNameSet = false; + + _documents.clear(); + _isDocumentsSet =false; + + _writeConcern = BSONObj(); + _isWriteConcernSet = false; + + _continueOnError = false; + _isContinueOnErrorSet = false; + + _shardVersion.reset(); + + _session = 0; + _isSessionSet = false; + + } + + void BatchedInsertRequest::cloneTo(BatchedInsertRequest* other) const { + other->clear(); + + other->_collName = _collName; + other->_isCollNameSet = _isCollNameSet; + + for(std::vector<BSONObj>::const_iterator it = _documents.begin(); + it != _documents.end(); + ++it) { + other->addToDocuments(*it); + } + other->_isDocumentsSet = _isDocumentsSet; + + other->_writeConcern = _writeConcern; + other->_isWriteConcernSet = _isWriteConcernSet; + + other->_continueOnError = _continueOnError; + other->_isContinueOnErrorSet = _isContinueOnErrorSet; + + if (other->_shardVersion.get()) _shardVersion->cloneTo(other->_shardVersion.get()); + + other->_session = _session; + other->_isSessionSet = _isSessionSet; + } + + std::string BatchedInsertRequest::toString() const { + return toBSON().toString(); + } + + void BatchedInsertRequest::setCollName(const StringData& collName) { + _collName = collName.toString(); + _isCollNameSet = true; + } + + void BatchedInsertRequest::unsetCollName() { + _isCollNameSet = false; + } + + bool BatchedInsertRequest::isCollNameSet() const { + return _isCollNameSet; + } + + const std::string& BatchedInsertRequest::getCollName() const { + dassert(_isCollNameSet); + return _collName; + } + + void BatchedInsertRequest::setDocuments(const std::vector<BSONObj>& documents) { + for (std::vector<BSONObj>::const_iterator it = documents.begin(); + it != documents.end(); + ++it) { + addToDocuments((*it).getOwned()); + } + _isDocumentsSet = documents.size() > 0; + } + + void BatchedInsertRequest::addToDocuments(const BSONObj& documents) { + _documents.push_back(documents); + _isDocumentsSet = true; + } + + void BatchedInsertRequest::unsetDocuments() { + _documents.clear(); + _isDocumentsSet = false; + } + + bool BatchedInsertRequest::isDocumentsSet() const { + return _isDocumentsSet; + } + + size_t BatchedInsertRequest::sizeDocuments() const { + return _documents.size(); + } + + const std::vector<BSONObj>& BatchedInsertRequest::getDocuments() const { + dassert(_isDocumentsSet); + return _documents; + } + + const BSONObj& BatchedInsertRequest::getDocumentsAt(size_t pos) const { + dassert(_isDocumentsSet); + dassert(_documents.size() > pos); + return _documents.at(pos); + } + + void BatchedInsertRequest::setWriteConcern(const BSONObj& writeConcern) { + _writeConcern = writeConcern.getOwned(); + _isWriteConcernSet = true; + } + + void BatchedInsertRequest::unsetWriteConcern() { + _isWriteConcernSet = false; + } + + bool BatchedInsertRequest::isWriteConcernSet() const { + return _isWriteConcernSet; + } + + const BSONObj& BatchedInsertRequest::getWriteConcern() const { + dassert(_isWriteConcernSet); + return _writeConcern; + } + + void BatchedInsertRequest::setContinueOnError(bool continueOnError) { + _continueOnError = continueOnError; + _isContinueOnErrorSet = true; + } + + void BatchedInsertRequest::unsetContinueOnError() { + _isContinueOnErrorSet = false; + } + + bool BatchedInsertRequest::isContinueOnErrorSet() const { + return _isContinueOnErrorSet; + } + + bool BatchedInsertRequest::getContinueOnError() const { + dassert(_isContinueOnErrorSet); + return _continueOnError; + } + + void BatchedInsertRequest::setShardVersion(const ChunkVersion& shardVersion) { + auto_ptr<ChunkVersion> temp(new ChunkVersion); + shardVersion.cloneTo(temp.get()); + _shardVersion.reset(temp.release()); + } + + void BatchedInsertRequest::unsetShardVersion() { + _shardVersion.reset(); + } + + bool BatchedInsertRequest::isShardVersionSet() const { + return _shardVersion.get() != NULL; + } + + const ChunkVersion& BatchedInsertRequest::getShardVersion() const { + dassert(_shardVersion.get()); + return *_shardVersion; + } + + void BatchedInsertRequest::setSession(long long session) { + _session = session; + _isSessionSet = true; + } + + void BatchedInsertRequest::unsetSession() { + _isSessionSet = false; + } + + bool BatchedInsertRequest::isSessionSet() const { + return _isSessionSet; + } + + long long BatchedInsertRequest::getSession() const { + dassert(_isSessionSet); + return _session; + } + + } // namespace mongo diff --git a/src/mongo/s/batched_insert_request.h b/src/mongo/s/batched_insert_request.h new file mode 100644 index 00000000000..cba507a9901 --- /dev/null +++ b/src/mongo/s/batched_insert_request.h @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <boost/scoped_ptr.hpp> +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/bson_serializable.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + + /** + * This class represents the layout and content of a batched insert runCommand, + * the request side. + */ + class BatchedInsertRequest : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedInsertRequest); + public: + + // + // schema declarations + // + + // Name used for the batched insert invocation. + static const std::string BATCHED_INSERT_REQUEST; + + // Field names and types in the batched insert command type. + static const BSONField<std::string> collName; + static const BSONField<std::vector<BSONObj> > documents; + static const BSONField<BSONObj> writeConcern; + static const BSONField<bool> continueOnError; + static const BSONField<ChunkVersion> shardVersion; + static const BSONField<long long> session; + + // + // construction / destruction + // + + BatchedInsertRequest(); + virtual ~BatchedInsertRequest(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedInsertRequest* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setCollName(const StringData& collName); + void unsetCollName(); + bool isCollNameSet() const; + const std::string& getCollName() const; + + void setDocuments(const std::vector<BSONObj>& documents); + void addToDocuments(const BSONObj& documents); + void unsetDocuments(); + bool isDocumentsSet() const; + size_t sizeDocuments() const; + const std::vector<BSONObj>& getDocuments() const; + const BSONObj& getDocumentsAt(size_t pos) const; + + void setWriteConcern(const BSONObj& writeConcern); + void unsetWriteConcern(); + bool isWriteConcernSet() const; + const BSONObj& getWriteConcern() const; + + void setContinueOnError(bool continueOnError); + void unsetContinueOnError(); + bool isContinueOnErrorSet() const; + bool getContinueOnError() const; + + void setShardVersion(const ChunkVersion& shardVersion); + void unsetShardVersion(); + bool isShardVersionSet() const; + const ChunkVersion& getShardVersion() const; + + void setSession(long long session); + void unsetSession(); + bool isSessionSet() const; + long long getSession() const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) collection we're inserting on + std::string _collName; + bool _isCollNameSet; + + // (M) array of documents to be inserted + std::vector<BSONObj> _documents; + bool _isDocumentsSet; + + // (M) to be issued after the batch applied + BSONObj _writeConcern; + bool _isWriteConcernSet; + + // (M) whether batch is issued in parallel or not + bool _continueOnError; + bool _isContinueOnErrorSet; + + // (O) version for this collection on a given shard + boost::scoped_ptr<ChunkVersion> _shardVersion; + + // (O) session number the inserts belong to + long long _session; + bool _isSessionSet; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_insert_request_test.cpp b/src/mongo/s/batched_insert_request_test.cpp new file mode 100644 index 00000000000..fea8ee67f12 --- /dev/null +++ b/src/mongo/s/batched_insert_request_test.cpp @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_insert_request.h" + +#include <string> + +#include "mongo/db/jsobj.h" +#include "mongo/unittest/unittest.h" + +namespace { + + using mongo::BSONArray; + using mongo::BSONObj; + using mongo::BatchedInsertRequest; + using mongo::BSONArrayBuilder; + using mongo::OID; + using mongo::OpTime; + using std::string; + + + TEST(RoundTrip, Normal) { + BSONArray insertArray = BSON_ARRAY(BSON("a" << 1) << BSON("b" << 1)); + + BSONObj writeConcernObj = BSON("w" << 1); + + // The BSON_ARRAY macro doesn't support Timestamps. + BSONArrayBuilder arrBuilder; + arrBuilder.appendTimestamp(OpTime(1,1).asDate()); + arrBuilder.append(OID::gen()); + BSONArray shardVersionArray = arrBuilder.arr(); + + BSONObj origInsertRequestObj = + BSON(BatchedInsertRequest::collName("test") << + BatchedInsertRequest::documents() << insertArray << + BatchedInsertRequest::writeConcern(writeConcernObj) << + BatchedInsertRequest::continueOnError(false) << + BatchedInsertRequest::shardVersion() << shardVersionArray << + BatchedInsertRequest::session(0)); + + string errMsg; + BatchedInsertRequest request; + bool ok = request.parseBSON(origInsertRequestObj, &errMsg); + ASSERT_TRUE(ok); + + BSONObj genInsertRequestObj = request.toBSON(); + ASSERT_EQUALS(0, genInsertRequestObj.woCompare(origInsertRequestObj)); + } + +} // unnamed namespace diff --git a/src/mongo/s/batched_update_document.cpp b/src/mongo/s/batched_update_document.cpp new file mode 100644 index 00000000000..d59cb687d87 --- /dev/null +++ b/src/mongo/s/batched_update_document.cpp @@ -0,0 +1,205 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_update_document.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + + const BSONField<BSONObj> BatchedUpdateDocument::query("q"); + const BSONField<BSONObj> BatchedUpdateDocument::updateExpr("u"); + const BSONField<bool> BatchedUpdateDocument::multi("multi"); + const BSONField<bool> BatchedUpdateDocument::upsert("upsert"); + + BatchedUpdateDocument::BatchedUpdateDocument() { + clear(); + } + + BatchedUpdateDocument::~BatchedUpdateDocument() { + } + + bool BatchedUpdateDocument::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isQuerySet) { + *errMsg = stream() << "missing " << query.name() << " field"; + return false; + } + + if (!_isUpdateExprSet) { + *errMsg = stream() << "missing " << updateExpr.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedUpdateDocument::toBSON() const { + BSONObjBuilder builder; + + if (_isQuerySet) builder.append(query(), _query); + + if (_isUpdateExprSet) builder.append(updateExpr(), _updateExpr); + + if (_isMultiSet) builder.append(multi(), _multi); + + if (_isUpsertSet) builder.append(upsert(), _upsert); + + return builder.obj(); + } + + bool BatchedUpdateDocument::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, query, &_query, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isQuerySet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, updateExpr, &_updateExpr, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isUpdateExprSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, multi, &_multi, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isMultiSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, upsert, &_upsert, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isUpsertSet = fieldState == FieldParser::FIELD_SET; + + return true; + } + + void BatchedUpdateDocument::clear() { + _query = BSONObj(); + _isQuerySet = false; + + _updateExpr = BSONObj(); + _isUpdateExprSet = false; + + _multi = false; + _isMultiSet = false; + + _upsert = false; + _isUpsertSet = false; + + } + + void BatchedUpdateDocument::cloneTo(BatchedUpdateDocument* other) const { + other->clear(); + + other->_query = _query; + other->_isQuerySet = _isQuerySet; + + other->_updateExpr = _updateExpr; + other->_isUpdateExprSet = _isUpdateExprSet; + + other->_multi = _multi; + other->_isMultiSet = _isMultiSet; + + other->_upsert = _upsert; + other->_isUpsertSet = _isUpsertSet; + } + + std::string BatchedUpdateDocument::toString() const { + return toBSON().toString(); + } + + void BatchedUpdateDocument::setQuery(const BSONObj& query) { + _query = query.getOwned(); + _isQuerySet = true; + } + + void BatchedUpdateDocument::unsetQuery() { + _isQuerySet = false; + } + + bool BatchedUpdateDocument::isQuerySet() const { + return _isQuerySet; + } + + const BSONObj& BatchedUpdateDocument::getQuery() const { + dassert(_isQuerySet); + return _query; + } + + void BatchedUpdateDocument::setUpdateExpr(const BSONObj& updateExpr) { + _updateExpr = updateExpr.getOwned(); + _isUpdateExprSet = true; + } + + void BatchedUpdateDocument::unsetUpdateExpr() { + _isUpdateExprSet = false; + } + + bool BatchedUpdateDocument::isUpdateExprSet() const { + return _isUpdateExprSet; + } + + const BSONObj& BatchedUpdateDocument::getUpdateExpr() const { + dassert(_isUpdateExprSet); + return _updateExpr; + } + + void BatchedUpdateDocument::setMulti(bool multi) { + _multi = multi; + _isMultiSet = true; + } + + void BatchedUpdateDocument::unsetMulti() { + _isMultiSet = false; + } + + bool BatchedUpdateDocument::isMultiSet() const { + return _isMultiSet; + } + + bool BatchedUpdateDocument::getMulti() const { + dassert(_isMultiSet); + return _multi; + } + + void BatchedUpdateDocument::setUpsert(bool upsert) { + _upsert = upsert; + _isUpsertSet = true; + } + + void BatchedUpdateDocument::unsetUpsert() { + _isUpsertSet = false; + } + + bool BatchedUpdateDocument::isUpsertSet() const { + return _isUpsertSet; + } + + bool BatchedUpdateDocument::getUpsert() const { + dassert(_isUpsertSet); + return _upsert; + } + + } // namespace mongo diff --git a/src/mongo/s/batched_update_document.h b/src/mongo/s/batched_update_document.h new file mode 100644 index 00000000000..6b2de655c3d --- /dev/null +++ b/src/mongo/s/batched_update_document.h @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/bson_serializable.h" + +namespace mongo { + + /** + * This class represents the layout and content of a update document runCommand, + * in the request side. + */ + class BatchedUpdateDocument : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedUpdateDocument); + public: + + // + // schema declarations + // + + static const BSONField<BSONObj> query; + static const BSONField<BSONObj> updateExpr; + static const BSONField<bool> multi; + static const BSONField<bool> upsert; + + // + // construction / destruction + // + + BatchedUpdateDocument(); + virtual ~BatchedUpdateDocument(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedUpdateDocument* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setQuery(const BSONObj& query); + void unsetQuery(); + bool isQuerySet() const; + const BSONObj& getQuery() const; + + void setUpdateExpr(const BSONObj& updateExpr); + void unsetUpdateExpr(); + bool isUpdateExprSet() const; + const BSONObj& getUpdateExpr() const; + + void setMulti(bool multi); + void unsetMulti(); + bool isMultiSet() const; + bool getMulti() const; + + void setUpsert(bool upsert); + void unsetUpsert(); + bool isUpsertSet() const; + bool getUpsert() const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) query whose result the update will manipulate + BSONObj _query; + bool _isQuerySet; + + // (M) the update expression itself + BSONObj _updateExpr; + bool _isUpdateExprSet; + + // (O) whether multiple documents are to be updated + bool _multi; + bool _isMultiSet; + + // (O) whether upserts are allowed + bool _upsert; + bool _isUpsertSet; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_update_request.cpp b/src/mongo/s/batched_update_request.cpp new file mode 100644 index 00000000000..7546df897ab --- /dev/null +++ b/src/mongo/s/batched_update_request.cpp @@ -0,0 +1,321 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_update_request.h" + +#include "mongo/db/field_parser.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + using mongoutils::str::stream; + + const std::string BatchedUpdateRequest::BATCHED_UPDATE_REQUEST = "update"; + const BSONField<std::string> BatchedUpdateRequest::collName("insert"); + const BSONField<std::vector<BatchedUpdateDocument*> > BatchedUpdateRequest::updates("updates"); + const BSONField<BSONObj> BatchedUpdateRequest::writeConcern("writeConcern"); + const BSONField<bool> BatchedUpdateRequest::continueOnError("continueOnError", false); + const BSONField<ChunkVersion> BatchedUpdateRequest::shardVersion("shardVersion"); + const BSONField<long long> BatchedUpdateRequest::session("session"); + + BatchedUpdateRequest::BatchedUpdateRequest() { + clear(); + } + + BatchedUpdateRequest::~BatchedUpdateRequest() { + unsetUpdates(); + } + + bool BatchedUpdateRequest::isValid(std::string* errMsg) const { + std::string dummy; + if (errMsg == NULL) { + errMsg = &dummy; + } + + // All the mandatory fields must be present. + if (!_isCollNameSet) { + *errMsg = stream() << "missing " << collName.name() << " field"; + return false; + } + if (!_isUpdatesSet) { + *errMsg = stream() << "missing " << updates.name() << " field"; + return false; + } + if (!_isWriteConcernSet) { + *errMsg = stream() << "missing " << writeConcern.name() << " field"; + return false; + } + if (!_isContinueOnErrorSet) { + *errMsg = stream() << "missing " << continueOnError.name() << " field"; + return false; + } + + return true; + } + + BSONObj BatchedUpdateRequest::toBSON() const { + BSONObjBuilder builder; + + if (_isCollNameSet) builder.append(collName(), _collName); + + if (_isUpdatesSet) { + BSONArrayBuilder updatesBuilder(builder.subarrayStart(updates())); + for (std::vector<BatchedUpdateDocument*>::const_iterator it = _updates.begin(); + it != _updates.end(); + ++it) { + BSONObj updateDocument = (*it)->toBSON(); + updatesBuilder.append(updateDocument); + } + updatesBuilder.done(); + } + + if (_isWriteConcernSet) builder.append(writeConcern(), _writeConcern); + + if (_isContinueOnErrorSet) builder.append(continueOnError(), _continueOnError); + + if (_shardVersion.get()) { + // ChunkVersion wants to be an array. + builder.append(shardVersion(), static_cast<BSONArray>(_shardVersion->toBSON())); + } + + if (_isSessionSet) builder.append(session(), _session); + + return builder.obj(); + } + + bool BatchedUpdateRequest::parseBSON(const BSONObj& source, string* errMsg) { + clear(); + + std::string dummy; + if (!errMsg) errMsg = &dummy; + + FieldParser::FieldState fieldState; + fieldState = FieldParser::extract(source, collName, &_collName, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isCollNameSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, updates, &_updates, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isUpdatesSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, writeConcern, &_writeConcern, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isWriteConcernSet = fieldState == FieldParser::FIELD_SET; + + fieldState = FieldParser::extract(source, continueOnError, &_continueOnError, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isContinueOnErrorSet = fieldState == FieldParser::FIELD_SET; + + ChunkVersion* tempChunkVersion = NULL; + fieldState = FieldParser::extract(source, shardVersion, &tempChunkVersion, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + if (fieldState == FieldParser::FIELD_SET) _shardVersion.reset(tempChunkVersion); + + fieldState = FieldParser::extract(source, session, &_session, errMsg); + if (fieldState == FieldParser::FIELD_INVALID) return false; + _isSessionSet = fieldState == FieldParser::FIELD_SET; + + return true; + } + + void BatchedUpdateRequest::clear() { + _collName.clear(); + _isCollNameSet = false; + + unsetUpdates(); + + _writeConcern = BSONObj(); + _isWriteConcernSet = false; + + _continueOnError = false; + _isContinueOnErrorSet = false; + + _shardVersion.reset(); + + _session = 0; + _isSessionSet = false; + + } + + void BatchedUpdateRequest::cloneTo(BatchedUpdateRequest* other) const { + other->clear(); + + other->_collName = _collName; + other->_isCollNameSet = _isCollNameSet; + + for(std::vector<BatchedUpdateDocument*>::const_iterator it = _updates.begin(); + it != _updates.end(); + ++it) { + auto_ptr<BatchedUpdateDocument> tempBatchUpdateDocument(new BatchedUpdateDocument); + (*it)->cloneTo(tempBatchUpdateDocument.get()); + other->addToUpdates(*it); + } + other->_isUpdatesSet = _isUpdatesSet; + + other->_writeConcern = _writeConcern; + other->_isWriteConcernSet = _isWriteConcernSet; + + other->_continueOnError = _continueOnError; + other->_isContinueOnErrorSet = _isContinueOnErrorSet; + + if (other->_shardVersion.get()) _shardVersion->cloneTo(other->_shardVersion.get()); + + other->_session = _session; + other->_isSessionSet = _isSessionSet; + } + + std::string BatchedUpdateRequest::toString() const { + return toBSON().toString(); + } + + void BatchedUpdateRequest::setCollName(const StringData& collName) { + _collName = collName.toString(); + _isCollNameSet = true; + } + + void BatchedUpdateRequest::unsetCollName() { + _isCollNameSet = false; + } + + bool BatchedUpdateRequest::isCollNameSet() const { + return _isCollNameSet; + } + + const std::string& BatchedUpdateRequest::getCollName() const { + dassert(_isCollNameSet); + return _collName; + } + + void BatchedUpdateRequest::setUpdates(const std::vector<BatchedUpdateDocument*>& updates) { + unsetUpdates(); + for (std::vector<BatchedUpdateDocument*>::const_iterator it = updates.begin(); + it != updates.end(); + ++it) { + auto_ptr<BatchedUpdateDocument> tempBatchUpdateDocument(new BatchedUpdateDocument); + (*it)->cloneTo(tempBatchUpdateDocument.get()); + addToUpdates(tempBatchUpdateDocument.release()); + } + _isUpdatesSet = updates.size() > 0; + } + + void BatchedUpdateRequest::addToUpdates(BatchedUpdateDocument* updates) { + _updates.push_back(updates); + _isUpdatesSet = true; + } + + void BatchedUpdateRequest::unsetUpdates() { + for(std::vector<BatchedUpdateDocument*>::iterator it = _updates.begin(); + it != _updates.end(); + ++it) { + delete *it; + } + _updates.clear(); + _isUpdatesSet = false; + } + + bool BatchedUpdateRequest::isUpdatesSet() const { + return _isUpdatesSet; + } + + size_t BatchedUpdateRequest::sizeUpdates() const { + return _updates.size(); + } + + const std::vector<BatchedUpdateDocument*>& BatchedUpdateRequest::getUpdates() const { + dassert(_isUpdatesSet); + return _updates; + } + + const BatchedUpdateDocument* BatchedUpdateRequest::getUpdatesAt(size_t pos) const { + dassert(_isUpdatesSet); + dassert(_updates.size() > pos); + return _updates.at(pos); + } + + void BatchedUpdateRequest::setWriteConcern(const BSONObj& writeConcern) { + _writeConcern = writeConcern.getOwned(); + _isWriteConcernSet = true; + } + + void BatchedUpdateRequest::unsetWriteConcern() { + _isWriteConcernSet = false; + } + + bool BatchedUpdateRequest::isWriteConcernSet() const { + return _isWriteConcernSet; + } + + const BSONObj& BatchedUpdateRequest::getWriteConcern() const { + dassert(_isWriteConcernSet); + return _writeConcern; + } + + void BatchedUpdateRequest::setContinueOnError(bool continueOnError) { + _continueOnError = continueOnError; + _isContinueOnErrorSet = true; + } + + void BatchedUpdateRequest::unsetContinueOnError() { + _isContinueOnErrorSet = false; + } + + bool BatchedUpdateRequest::isContinueOnErrorSet() const { + return _isContinueOnErrorSet; + } + + bool BatchedUpdateRequest::getContinueOnError() const { + dassert(_isContinueOnErrorSet); + return _continueOnError; + } + + void BatchedUpdateRequest::setShardVersion(const ChunkVersion& shardVersion) { + auto_ptr<ChunkVersion> temp(new ChunkVersion); + shardVersion.cloneTo(temp.get()); + _shardVersion.reset(temp.release()); + } + + void BatchedUpdateRequest::unsetShardVersion() { + _shardVersion.reset(); + } + + bool BatchedUpdateRequest::isShardVersionSet() const { + return _shardVersion.get() != NULL; + } + + const ChunkVersion& BatchedUpdateRequest::getShardVersion() const { + dassert(_shardVersion.get()); + return *_shardVersion; + } + + void BatchedUpdateRequest::setSession(long long session) { + _session = session; + _isSessionSet = true; + } + + void BatchedUpdateRequest::unsetSession() { + _isSessionSet = false; + } + + bool BatchedUpdateRequest::isSessionSet() const { + return _isSessionSet; + } + + long long BatchedUpdateRequest::getSession() const { + dassert(_isSessionSet); + return _session; + } + + } // namespace mongo diff --git a/src/mongo/s/batched_update_request.h b/src/mongo/s/batched_update_request.h new file mode 100644 index 00000000000..d78f022a4f1 --- /dev/null +++ b/src/mongo/s/batched_update_request.h @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <boost/scoped_ptr.hpp> +#include <string> +#include <vector> + +#include "mongo/base/string_data.h" +#include "mongo/db/jsobj.h" +#include "mongo/s/batched_update_document.h" +#include "mongo/s/bson_serializable.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + + /** + * This class represents the layout and content of a batched update runCommand, + * the request side. + */ + class BatchedUpdateRequest : public BSONSerializable { + MONGO_DISALLOW_COPYING(BatchedUpdateRequest); + public: + + // + // schema declarations + // + + // Name used for the batched update invocation. + static const std::string BATCHED_UPDATE_REQUEST; + + // Field names and types in the batched update command type. + static const BSONField<std::string> collName; + static const BSONField<std::vector<BatchedUpdateDocument*> > updates; + static const BSONField<BSONObj> writeConcern; + static const BSONField<bool> continueOnError; + static const BSONField<ChunkVersion> shardVersion; + static const BSONField<long long> session; + + // + // construction / destruction + // + + BatchedUpdateRequest(); + virtual ~BatchedUpdateRequest(); + + /** Copies all the fields present in 'this' to 'other'. */ + void cloneTo(BatchedUpdateRequest* other) const; + + // + // bson serializable interface implementation + // + + virtual bool isValid(std::string* errMsg) const; + virtual BSONObj toBSON() const; + virtual bool parseBSON(const BSONObj& source, std::string* errMsg); + virtual void clear(); + virtual std::string toString() const; + + // + // individual field accessors + // + + void setCollName(const StringData& collName); + void unsetCollName(); + bool isCollNameSet() const; + const std::string& getCollName() const; + + void setUpdates(const std::vector<BatchedUpdateDocument*>& updates); + void addToUpdates(BatchedUpdateDocument* updates); + void unsetUpdates(); + bool isUpdatesSet() const; + size_t sizeUpdates() const; + const std::vector<BatchedUpdateDocument*>& getUpdates() const; + const BatchedUpdateDocument* getUpdatesAt(size_t pos) const; + + void setWriteConcern(const BSONObj& writeConcern); + void unsetWriteConcern(); + bool isWriteConcernSet() const; + const BSONObj& getWriteConcern() const; + + void setContinueOnError(bool continueOnError); + void unsetContinueOnError(); + bool isContinueOnErrorSet() const; + bool getContinueOnError() const; + + void setShardVersion(const ChunkVersion& shardVersion); + void unsetShardVersion(); + bool isShardVersionSet() const; + const ChunkVersion& getShardVersion() const; + + void setSession(long long session); + void unsetSession(); + bool isSessionSet() const; + long long getSession() const; + + private: + // Convention: (M)andatory, (O)ptional + + // (M) collection we're updating from + std::string _collName; + bool _isCollNameSet; + + // (M) array of individual updates + std::vector<BatchedUpdateDocument*> _updates; + bool _isUpdatesSet; + + // (M) to be issued after the batch applied + BSONObj _writeConcern; + bool _isWriteConcernSet; + + // (M) whether batch is issued in parallel or not + bool _continueOnError; + bool _isContinueOnErrorSet; + + // (O) version for this collection on a given shard + boost::scoped_ptr<ChunkVersion> _shardVersion; + + // (O) session number the inserts belong to + long long _session; + bool _isSessionSet; + }; + +} // namespace mongo diff --git a/src/mongo/s/batched_update_request_test.cpp b/src/mongo/s/batched_update_request_test.cpp new file mode 100644 index 00000000000..daf4ec464f6 --- /dev/null +++ b/src/mongo/s/batched_update_request_test.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "mongo/s/batched_update_request.h" + +#include <string> + +#include "mongo/db/jsobj.h" +#include "mongo/s/batched_update_document.h" +#include "mongo/unittest/unittest.h" + +namespace { + + using std::string; + using mongo::BatchedUpdateDocument; + using mongo::BatchedUpdateRequest; + using mongo::BSONArray; + using mongo::BSONArrayBuilder; + using mongo::BSONObj; + using mongo::OID; + using mongo::OpTime; + + TEST(RoundTrip, Normal) { + BSONArray updateArray = + BSON_ARRAY( + BSON(BatchedUpdateDocument::query(BSON("a" << 1)) << + BatchedUpdateDocument::updateExpr(BSON("$set" << BSON("a" << 1))) << + BatchedUpdateDocument::multi(false) << + BatchedUpdateDocument::upsert(false) + ) << + BSON(BatchedUpdateDocument::query(BSON("b" << 1)) << + BatchedUpdateDocument::updateExpr(BSON("$set" << BSON("b" << 2))) << + BatchedUpdateDocument::multi(false) << + BatchedUpdateDocument::upsert(false) + ) + ); + + BSONObj writeConcernObj = BSON("w" << 1); + + // The BSON_ARRAY macro doesn't support Timestamps. + BSONArrayBuilder arrBuilder; + arrBuilder.appendTimestamp(OpTime(1,1).asDate()); + arrBuilder.append(OID::gen()); + BSONArray shardVersionArray = arrBuilder.arr(); + + BSONObj origUpdateRequestObj = + BSON(BatchedUpdateRequest::collName("test") << + BatchedUpdateRequest::updates() << updateArray << + BatchedUpdateRequest::writeConcern(writeConcernObj) << + BatchedUpdateRequest::continueOnError(false) << + BatchedUpdateRequest::shardVersion() << shardVersionArray << + BatchedUpdateRequest::session(0)); + + string errMsg; + BatchedUpdateRequest request; + bool ok = request.parseBSON(origUpdateRequestObj, &errMsg); + ASSERT_TRUE(ok); + + BSONObj genUpdateRequestObj = request.toBSON(); + ASSERT_EQUALS(0, genUpdateRequestObj.woCompare(origUpdateRequestObj)); + } + +} // unnamed namespace diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h index 1047b3c3103..f6cac8a133a 100644 --- a/src/mongo/s/chunk_version.h +++ b/src/mongo/s/chunk_version.h @@ -29,13 +29,17 @@ #pragma once #include "mongo/db/jsobj.h" +#include "mongo/s/bson_serializable.h" namespace mongo { - // - // ChunkVersions consist of a major/minor version scoped to a version epoch - // - struct ChunkVersion { + /** + * ChunkVersions consist of a major/minor version scoped to a version epoch + * + * TODO: This is a "manual type" but, even so, still needs to comform to what's + * expected from types. + */ + struct ChunkVersion : public BSONSerializable { union { struct { int _minor; @@ -347,7 +351,7 @@ namespace mongo { // versions that know nothing about epochs. // - BSONObj toBSON( const string& prefixIn="" ) const { + BSONObj toBSONWithPrefix( const string& prefixIn ) const { BSONObjBuilder b; string prefix = prefixIn; @@ -359,13 +363,61 @@ namespace mongo { } void addToBSON( BSONObjBuilder& b, const string& prefix="" ) const { - b.appendElements( toBSON( prefix ) ); + b.appendElements( toBSONWithPrefix( prefix ) ); } void addEpochToBSON( BSONObjBuilder& b, const string& prefix="" ) const { b.append( prefix + "Epoch", _epoch ); } + // + // bson serializable interface implementation + // (toBSON and toString were implemented above) + // + + virtual bool isValid(std::string* errMsg) const { + // TODO is there any check we want to do here? + return true; + } + + virtual BSONObj toBSON() const { + // ChunkVersion wants to be an array. + BSONArrayBuilder b; + b.appendTimestamp(_combined); + b.append(_epoch); + return b.arr(); + } + + virtual bool parseBSON(const BSONObj& source, std::string* errMsg) { + // ChunkVersion wants to be an array. + BSONArray arrSource = static_cast<BSONArray>(source); + + bool canParse; + ChunkVersion version = fromBSON(arrSource, &canParse); + if (!canParse) { + *errMsg = "Could not parse version structure"; + return false; + } + + _minor = version._minor; + _major = version._major; + _epoch = version._epoch; + return true; + } + + virtual void clear() { + _minor = 0; + _major = 0; + _epoch = OID(); + } + + void cloneTo(ChunkVersion* other) const { + other->clear(); + other->_minor = _minor; + other->_major = _major; + other->_epoch = _epoch; + } + }; inline ostream& operator<<( ostream &s , const ChunkVersion& v) { |