From a7a593da31a944c90d7c5f0422eeee8264eb438d Mon Sep 17 00:00:00 2001 From: Mathias Stearn Date: Thu, 7 Apr 2016 18:31:21 -0400 Subject: SERVER-23128 Parsers that parse legacy and command writes into uniform objects --- src/mongo/db/SConscript | 1 + src/mongo/db/dbmessage.h | 2 +- src/mongo/db/instance.cpp | 126 +++++------ src/mongo/db/ops/SConscript | 20 ++ src/mongo/db/ops/write_ops.h | 85 ++++++++ src/mongo/db/ops/write_ops_parsers.cpp | 275 ++++++++++++++++++++++++ src/mongo/db/ops/write_ops_parsers.h | 54 +++++ src/mongo/db/ops/write_ops_parsers_test.cpp | 313 ++++++++++++++++++++++++++++ 8 files changed, 806 insertions(+), 70 deletions(-) create mode 100644 src/mongo/db/ops/write_ops.h create mode 100644 src/mongo/db/ops/write_ops_parsers.cpp create mode 100644 src/mongo/db/ops/write_ops_parsers.h create mode 100644 src/mongo/db/ops/write_ops_parsers_test.cpp (limited to 'src') diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 5c54eee62de..71e16230890 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -717,6 +717,7 @@ serveronlyLibdeps = [ "index/index_descriptor", "matcher/expressions_mongod_only", "ops/update_driver", + "ops/write_ops_parsers", "pipeline/document_source", "pipeline/pipeline", "query/query", diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h index 976e4b37de1..29629e552fd 100644 --- a/src/mongo/db/dbmessage.h +++ b/src/mongo/db/dbmessage.h @@ -237,7 +237,7 @@ public: /* for insert and update msgs */ bool moreJSObjs() const { - return _nextjsobj != 0; + return _nextjsobj != 0 && _nextjsobj != _theEnd; } BSONObj nextJsObj(); diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 021e2e6e6c2..c71fc2c814d 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -74,6 +74,7 @@ #include "mongo/db/ops/update_driver.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/query/find.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" @@ -655,42 +656,41 @@ void receivedKillCursors(OperationContext* txn, Message& m) { } void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - DbMessage d(m); uassertStatusOK(userAllowedWriteNS(nsString)); - int flags = d.pullInt(); - BSONObj query = d.nextJsObj(); auto client = txn->getClient(); auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), &repl::ReplClientInfo::setLastOpToSystemLastOpTime, txn); - verify(d.moreJSObjs()); - verify(query.objsize() < m.header().dataLen()); - BSONObj toupdate = d.nextJsObj(); - uassert(10055, "update object too large", toupdate.objsize() <= BSONObjMaxUserSize); - verify(toupdate.objsize() < m.header().dataLen()); - verify(query.objsize() + toupdate.objsize() < m.header().dataLen()); - bool upsert = flags & UpdateOption_Upsert; - bool multi = flags & UpdateOption_Multi; + auto updateOp = parseLegacyUpdate(m); + auto& singleUpdate = updateOp.updates[0]; + + uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize); - op.debug().query = query; + op.debug().query = singleUpdate.query; { stdx::lock_guard lk(*client); op.setNS_inlock(nsString.ns()); - op.setQuery_inlock(query); + op.setQuery_inlock(singleUpdate.query); } - Status status = - AuthorizationSession::get(client)->checkAuthForUpdate(nsString, query, toupdate, upsert); - audit::logUpdateAuthzCheck(client, nsString, query, toupdate, upsert, multi, status.code()); + Status status = AuthorizationSession::get(client)->checkAuthForUpdate( + nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); + audit::logUpdateAuthzCheck(client, + nsString, + singleUpdate.query, + singleUpdate.update, + singleUpdate.upsert, + singleUpdate.multi, + status.code()); uassertStatusOK(status); UpdateRequest request(nsString); - request.setUpsert(upsert); - request.setMulti(multi); - request.setQuery(query); - request.setUpdates(toupdate); + request.setUpsert(singleUpdate.upsert); + request.setMulti(singleUpdate.multi); + request.setQuery(singleUpdate.query); + request.setUpdates(singleUpdate.update); UpdateLifecycleImpl updateLifecycle(nsString); request.setLifecycle(&updateLifecycle); @@ -716,7 +716,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess auto collection = ctx.db()->getCollection(nsString); // The common case: no implicit collection creation - if (!upsert || collection != NULL) { + if (!singleUpdate.upsert || collection != NULL) { unique_ptr exec = uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate)); @@ -750,7 +750,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess break; } catch (const WriteConflictException& dle) { op.debug().writeConflicts++; - if (multi) { + if (singleUpdate.multi) { log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting"; throw; } @@ -814,13 +814,10 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess } void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - DbMessage d(m); uassertStatusOK(userAllowedWriteNS(nsString)); - int flags = d.pullInt(); - bool justOne = flags & RemoveOption_JustOne; - verify(d.moreJSObjs()); - BSONObj pattern = d.nextJsObj(); + auto deleteOp = parseLegacyDelete(m); + auto& singleDelete = deleteOp.deletes[0]; auto client = txn->getClient(); auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); @@ -828,20 +825,21 @@ void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Mess &repl::ReplClientInfo::setLastOpToSystemLastOpTime, txn); - op.debug().query = pattern; + op.debug().query = singleDelete.query; { stdx::lock_guard lk(*client); - op.setQuery_inlock(pattern); + op.setQuery_inlock(singleDelete.query); op.setNS_inlock(nsString.ns()); } - Status status = AuthorizationSession::get(client)->checkAuthForDelete(nsString, pattern); - audit::logDeleteAuthzCheck(client, nsString, pattern, status.code()); + Status status = + AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query); + audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code()); uassertStatusOK(status); DeleteRequest request(nsString); - request.setQuery(pattern); - request.setMulti(!justOne); + request.setQuery(singleDelete.query); + request.setMulti(singleDelete.multi); request.setYieldPolicy(PlanExecutor::YIELD_AUTO); @@ -969,7 +967,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, void insertMultiSingletons(OperationContext* txn, OldClientContext& ctx, bool keepGoing, - const char* ns, + StringData ns, CurOp& op, vector::iterator begin, vector::iterator end) { @@ -1002,7 +1000,7 @@ void insertMultiSingletons(OperationContext* txn, void insertMultiVector(OperationContext* txn, OldClientContext& ctx, bool keepGoing, - const char* ns, + StringData ns, CurOp& op, vector::iterator begin, vector::iterator end) { @@ -1033,10 +1031,16 @@ void insertMultiVector(OperationContext* txn, NOINLINE_DECL void insertMulti(OperationContext* txn, OldClientContext& ctx, - bool keepGoing, - const char* ns, - vector& docs, + const InsertOp& insertOp, CurOp& op) { + std::vector docs; + docs.reserve(insertOp.documents.size()); + for (auto&& doc : insertOp.documents) { + // TODO don't fail yet on invalid documents. They should be treated like other errors. + BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc)); + docs.push_back(fixed.isEmpty() ? doc : std::move(fixed)); + } + vector::iterator chunkBegin = docs.begin(); int64_t chunkCount = 0; int64_t chunkSize = 0; @@ -1047,13 +1051,6 @@ NOINLINE_DECL void insertMulti(OperationContext* txn, &repl::ReplClientInfo::setLastOpToSystemLastOpTime, txn); - for (vector::iterator it = docs.begin(); it != docs.end(); it++) { - StatusWith fixed = fixDocumentForInsert(*it); - uassertStatusOK(fixed.getStatus()); - if (!fixed.getValue().isEmpty()) - *it = fixed.getValue(); - } - for (vector::iterator it = docs.begin(); it != docs.end(); it++) { chunkSize += (*it).objsize(); // Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient, @@ -1061,16 +1058,19 @@ NOINLINE_DECL void insertMulti(OperationContext* txn, if ((++chunkCount >= internalQueryExecYieldIterations / 2) || (chunkSize >= insertVectorMaxBytes)) { if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure - insertMultiSingletons(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1); + insertMultiSingletons( + txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); else - insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1); + insertMultiVector( + txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); chunkBegin = it + 1; chunkCount = 0; chunkSize = 0; } } if (chunkBegin != docs.end()) - insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, docs.end()); + insertMultiVector( + txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end()); if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { // If this operation has already generated a new lastOp, don't bother setting it @@ -1159,9 +1159,7 @@ static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curO bool _receivedInsert(OperationContext* txn, const NamespaceString& nsString, - const char* ns, - vector& docs, - bool keepGoing, + const InsertOp& insertOp, CurOp& op, bool checkCollection) { // CONCURRENCY TODO: is being read locked in big log sufficient here? @@ -1169,16 +1167,14 @@ bool _receivedInsert(OperationContext* txn, uassert( 10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString)); - OldClientContext ctx(txn, ns); + OldClientContext ctx(txn, insertOp.ns.ns()); if (checkCollection && !ctx.db()->getCollection(nsString)) return false; - insertMulti(txn, ctx, keepGoing, ns, docs, op); + insertMulti(txn, ctx, insertOp, op); return true; } void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - DbMessage d(m); - const char* ns = d.getns(); { stdx::lock_guard(*txn->getClient()); CurOp::get(txn)->setNS_inlock(nsString.ns()); @@ -1186,29 +1182,21 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess uassertStatusOK(userAllowedWriteNS(nsString.ns())); if (nsString.isSystemDotIndexes()) { + DbMessage d(m); insertSystemIndexes(txn, d, op); return; } - if (!d.moreJSObjs()) { - // strange. should we complain? - return; - } - - vector multi; - while (d.moreJSObjs()) { - BSONObj obj = d.nextJsObj(); - multi.push_back(obj); + auto insertOp = parseLegacyInsert(m); - // Check auth for insert (also handles checking if this is an index build and checks - // for the proper privileges in that case). + for (const auto& obj : insertOp.documents) { + // Check auth for insert. Status status = AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj); audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); uassertStatusOK(status); } - const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError; { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); @@ -1216,7 +1204,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess // OldClientContext may implicitly create a database, so check existence if (dbHolder().get(txn, nsString.db()) != NULL) { - if (_receivedInsert(txn, nsString, ns, multi, keepGoing, op, true)) + if (_receivedInsert(txn, nsString, insertOp, op, true)) return; } } @@ -1225,7 +1213,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X); - _receivedInsert(txn, nsString, ns, multi, keepGoing, op, false); + _receivedInsert(txn, nsString, insertOp, op, false); } // ----- BEGIN Diaglog ----- diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 779bd57740c..07d7b7dc40d 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -233,3 +233,23 @@ env.CppUnitTest( 'update_driver', ], ) + +env.Library( + target='write_ops_parsers', + source=[ + 'write_ops_parsers.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/dbmessage', + ], +) + +env.CppUnitTest( + target='write_ops_parsers_test', + source='write_ops_parsers_test.cpp', + LIBDEPS=[ + 'write_ops_parsers', + '$BUILD_DIR/mongo/client/clientdriver', + ], +) diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h new file mode 100644 index 00000000000..267693bc559 --- /dev/null +++ b/src/mongo/db/ops/write_ops.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include +#include + +#include "mongo/db/jsobj.h" +#include "mongo/db/namespace_string.h" + +namespace mongo { + +/** + * The base structure for all fields that are common for all write operations. + * + * Unlike ParsedUpdate and UpdateRequest (and the Delete counterparts), types deriving from this are + * intended to represent entire operations that may consist of multiple sub-operations. + */ +struct ParsedWriteOp { + NamespaceString ns; + boost::optional writeConcern; + bool bypassDocumentValidation = false; + bool continueOnError = false; +}; + +/** + * A parsed insert insert operation. + */ +struct InsertOp : ParsedWriteOp { + std::vector documents; +}; + +/** + * A parsed update operation. + */ +struct UpdateOp : ParsedWriteOp { + struct SingleUpdate { + BSONObj query; + BSONObj update; + bool multi = false; + bool upsert = false; + }; + + std::vector updates; +}; + +/** + * A parsed Delete operation. + */ +struct DeleteOp : ParsedWriteOp { + struct SingleDelete { + BSONObj query; + bool multi = true; + }; + + std::vector deletes; +}; + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp new file mode 100644 index 00000000000..8e01ee43eaa --- /dev/null +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -0,0 +1,275 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ops/write_ops_parsers.h" + +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/dbmessage.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +// The specified limit to the number of operations that can be included in a single write command. +// This is an attempt to avoid a large number of errors resulting in a reply that exceeds 16MB. It +// doesn't fully ensure that goal, but it reduces the probability of it happening. This limit should +// not be used if the protocol changes to avoid the 16MB limit on reply size. +const size_t kMaxWriteBatchSize = 1000; + +void checkTypeInArray(BSONType expectedType, + const BSONElement& elem, + const BSONElement& arrayElem) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "Wrong type for " << arrayElem.fieldNameStringData() << '[' + << elem.fieldNameStringData() << "]. Expected a " + << typeName(expectedType) << ", got a " << typeName(elem.type()) << '.', + elem.type() == expectedType); +} + +void checkType(BSONType expectedType, const BSONElement& elem) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "Wrong type for '" << elem.fieldNameStringData() << "'. Expected a " + << typeName(expectedType) << ", got a " << typeName(elem.type()) << '.', + elem.type() == expectedType); +} + +void checkOpCountForCommand(size_t numOps) { + uassert(ErrorCodes::InvalidLength, + str::stream() << "Write batch sizes must be between 1 and " << kMaxWriteBatchSize + << ". Got " << numOps << " operations.", + numOps != 0 && numOps <= kMaxWriteBatchSize); +} + +/** + * Parses the fields common to all write commands and sets uniqueField to the element named + * uniqueFieldName. The uniqueField is the only top-level field that is unique to the specific type + * of write command. + */ +void parseWriteCommand(StringData dbName, + const BSONObj& cmd, + StringData uniqueFieldName, + BSONElement* uniqueField, + ParsedWriteOp* op) { + // Command dispatch wouldn't get here with an empty object because the first field indicates + // which command to run. + invariant(!cmd.isEmpty()); + + bool haveUniqueField = false; + bool firstElement = true; + for (BSONElement field : cmd) { + if (firstElement) { + // The key is the command name and the value is the collection name + checkType(String, field); + op->ns = NamespaceString(dbName, field.valueStringData()); + firstElement = false; + continue; + } + + const StringData fieldName = field.fieldNameStringData(); + if (fieldName == "writeConcern") { + checkType(Object, field); + op->writeConcern = field.Obj(); + } else if (fieldName == "bypassDocumentValidation") { + checkType(Bool, field); + op->bypassDocumentValidation = field.Bool(); + } else if (fieldName == "ordered") { + checkType(Bool, field); + op->continueOnError = !field.Bool(); + } else if (fieldName == uniqueFieldName) { + haveUniqueField = true; + *uniqueField = field; + } else if (fieldName[0] != '$') { + std::initializer_list ignoredFields = {"maxTimeMS", "shardVersion"}; + uassert(ErrorCodes::FailedToParse, + str::stream() << "Unknown option to " << cmd.firstElementFieldName() + << " command: " << fieldName, + std::find(ignoredFields.begin(), ignoredFields.end(), fieldName) != + ignoredFields.end()); + } + } + + uassert(ErrorCodes::FailedToParse, + str::stream() << "The " << uniqueFieldName << " option is required to the " + << cmd.firstElementFieldName() << " command.", + haveUniqueField); +} +} + +InsertOp parseInsertCommand(StringData dbName, const BSONObj& cmd) { + BSONElement documents; + InsertOp op; + parseWriteCommand(dbName, cmd, "documents", &documents, &op); + checkType(Array, documents); + for (auto doc : documents.Obj()) { + checkTypeInArray(Object, doc, documents); + op.documents.push_back(doc.Obj()); + } + checkOpCountForCommand(op.documents.size()); + return op; +} + +UpdateOp parseUpdateCommand(StringData dbName, const BSONObj& cmd) { + BSONElement updates; + UpdateOp op; + parseWriteCommand(dbName, cmd, "updates", &updates, &op); + checkType(Array, updates); + for (auto doc : updates.Obj()) { + checkTypeInArray(Object, doc, updates); + op.updates.emplace_back(); + auto& update = op.updates.back(); + bool haveQ = false; + bool haveU = false; + for (auto field : doc.Obj()) { + const StringData fieldName = field.fieldNameStringData(); + if (fieldName == "q") { + haveQ = true; + checkType(Object, field); + update.query = field.Obj(); + } else if (fieldName == "u") { + haveU = true; + checkType(Object, field); + update.update = field.Obj(); + } else if (fieldName == "multi") { + checkType(Bool, field); + update.multi = field.Bool(); + } else if (fieldName == "upsert") { + checkType(Bool, field); + update.upsert = field.Bool(); + } else { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "Unrecognized field in update operation: " << fieldName); + } + } + + uassert(ErrorCodes::FailedToParse, "The 'q' field is required for all updates", haveQ); + uassert(ErrorCodes::FailedToParse, "The 'u' field is required for all updates", haveU); + } + checkOpCountForCommand(op.updates.size()); + return op; +} + +DeleteOp parseDeleteCommand(StringData dbName, const BSONObj& cmd) { + BSONElement deletes; + DeleteOp op; + parseWriteCommand(dbName, cmd, "deletes", &deletes, &op); + checkType(Array, deletes); + for (auto doc : deletes.Obj()) { + checkTypeInArray(Object, doc, deletes); + op.deletes.emplace_back(); + auto& del = op.deletes.back(); // delete is a reserved word. + bool haveQ = false; + bool haveLimit = false; + for (auto field : doc.Obj()) { + const StringData fieldName = field.fieldNameStringData(); + if (fieldName == "q") { + haveQ = true; + checkType(Object, field); + del.query = field.Obj(); + } else if (fieldName == "limit") { + haveLimit = true; + uassert(ErrorCodes::TypeMismatch, + str::stream() + << "The limit field in delete objects must be a number. Got a " + << typeName(field.type()), + field.isNumber()); + + // Using a double to avoid throwing away illegal fractional portion. Don't want to + // accept 0.5 here. + const double limit = field.numberDouble(); + uassert(ErrorCodes::FailedToParse, + str::stream() << "The limit field in delete objects must be 0 or 1. Got " + << limit, + limit == 0 || limit == 1); + del.multi = (limit == 0); + } else { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "Unrecognized field in delete operation: " << fieldName); + } + } + uassert(ErrorCodes::FailedToParse, "The 'q' field is required for all deletes", haveQ); + uassert( + ErrorCodes::FailedToParse, "The 'limit' field is required for all deletes", haveLimit); + } + checkOpCountForCommand(op.deletes.size()); + return op; +} + +InsertOp parseLegacyInsert(const Message& msgRaw) { + DbMessage msg(msgRaw); + + InsertOp op; + op.ns = NamespaceString(msg.getns()); + op.continueOnError = msg.reservedField() & InsertOption_ContinueOnError; + uassert(ErrorCodes::InvalidLength, "Need at least one object to insert", msg.moreJSObjs()); + while (msg.moreJSObjs()) { + op.documents.push_back(msg.nextJsObj()); + } + // There is no limit on the number of inserts in a legacy batch. + + return op; +} + +UpdateOp parseLegacyUpdate(const Message& msgRaw) { + DbMessage msg(msgRaw); + + UpdateOp op; + op.ns = NamespaceString(msg.getns()); + + // Legacy updates only allowed one update per operation. Layout is flags, query, update. + op.updates.emplace_back(); + auto& singleUpdate = op.updates.back(); + const int flags = msg.pullInt(); + singleUpdate.upsert = flags & UpdateOption_Upsert; + singleUpdate.multi = flags & UpdateOption_Multi; + singleUpdate.query = msg.nextJsObj(); + singleUpdate.update = msg.nextJsObj(); + + return op; +} + +DeleteOp parseLegacyDelete(const Message& msgRaw) { + DbMessage msg(msgRaw); + + DeleteOp op; + op.ns = NamespaceString(msg.getns()); + + // Legacy deletes only allowed one delete per operation. Layout is flags, query. + op.deletes.emplace_back(); + auto& singleDelete = op.deletes.back(); + const int flags = msg.pullInt(); + singleDelete.multi = !(flags & RemoveOption_JustOne); + singleDelete.query = msg.nextJsObj(); + + return op; +} + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h new file mode 100644 index 00000000000..526703fadd3 --- /dev/null +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/jsobj.h" +#include "mongo/util/net/message.h" +#include "mongo/db/ops/write_ops.h" + +namespace mongo { + +/** + * This file contains functions to parse write operations from the user-facing wire format using + * either write commands or the legacy OP_INSERT/OP_UPDATE/OP_DELETE wire operations. Parse errors + * are reported by uasserting. + * + * These only parse and validate the operation structure. No attempt is made to parse or validate + * the objects to insert, or update and query operators. + */ + +InsertOp parseInsertCommand(StringData dbName, const BSONObj& cmd); +UpdateOp parseUpdateCommand(StringData dbName, const BSONObj& cmd); +DeleteOp parseDeleteCommand(StringData dbName, const BSONObj& cmd); + +InsertOp parseLegacyInsert(const Message& msg); +UpdateOp parseLegacyUpdate(const Message& msg); +DeleteOp parseLegacyDelete(const Message& msg); + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp new file mode 100644 index 00000000000..04b3982762e --- /dev/null +++ b/src/mongo/db/ops/write_ops_parsers_test.cpp @@ -0,0 +1,313 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +TEST(CommandWriteOpsParsers, CommonFields_WriteConcern) { + auto writeConcern = BSON("w" << 2); + auto cmd = BSON("insert" + << "bar" + << "documents" << BSON_ARRAY(BSONObj()) << "writeConcern" << writeConcern); + auto op = parseInsertCommand("foo", cmd); + ASSERT(bool(op.writeConcern)); + ASSERT_EQ(*op.writeConcern, writeConcern); +} + +TEST(CommandWriteOpsParsers, CommonFields_BypassDocumentValidation) { + for (bool bypassDocumentValidation : {true, false}) { + auto cmd = BSON("insert" + << "bar" + << "documents" << BSON_ARRAY(BSONObj()) << "bypassDocumentValidation" + << bypassDocumentValidation); + auto op = parseInsertCommand("foo", cmd); + ASSERT_EQ(op.bypassDocumentValidation, bypassDocumentValidation); + } +} + +TEST(CommandWriteOpsParsers, CommonFields_Ordered) { + for (bool ordered : {true, false}) { + auto cmd = BSON("insert" + << "bar" + << "documents" << BSON_ARRAY(BSONObj()) << "ordered" << ordered); + auto op = parseInsertCommand("foo", cmd); + ASSERT_EQ(op.continueOnError, !ordered); + } +} + +TEST(CommandWriteOpsParsers, CommonFields_IgnoredFields) { + // These flags are ignored, so there is nothing to check other than that this doesn't throw. + auto cmd = BSON("insert" + << "bar" + << "documents" << BSON_ARRAY(BSONObj()) << "maxTimeMS" << 1000 << "shardVersion" + << BSONObj()); + parseInsertCommand("foo", cmd); +} + +TEST(CommandWriteOpsParsers, GarbageFieldsAtTopLevel) { + auto cmd = BSON("insert" + << "bar" + << "documents" << BSON_ARRAY(BSONObj()) << "GARBAGE" << 1); + ASSERT_THROWS_CODE(parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse); +} + +TEST(CommandWriteOpsParsers, GarbageFieldsInUpdateDoc) { + auto cmd = + BSON("update" + << "bar" + << "updates" << BSON_ARRAY("q" << BSONObj() << "u" << BSONObj() << "GARBAGE" << 1)); + ASSERT_THROWS_CODE(parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse); +} + +TEST(CommandWriteOpsParsers, GarbageFieldsInDeleteDoc) { + auto cmd = BSON("delete" + << "bar" + << "deletes" << BSON_ARRAY("q" << BSONObj() << "limit" << 0 << "GARBAGE" << 1)); + ASSERT_THROWS_CODE(parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse); +} + +TEST(CommandWriteOpsParsers, SingleInsert) { + const auto ns = NamespaceString("test", "foo"); + const BSONObj obj = BSON("x" << 1); + auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj)); + const auto op = parseInsertCommand(ns.db(), cmd); + ASSERT_EQ(op.ns.ns(), ns.ns()); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT(!op.continueOnError); + ASSERT_EQ(op.documents.size(), 1u); + ASSERT_EQ(op.documents[0], obj); +} + +TEST(CommandWriteOpsParsers, EmptyMultiInsertFails) { + const auto ns = NamespaceString("test", "foo"); + auto cmd = BSON("insert" << ns.coll() << "documents" << BSONArray()); + ASSERT_THROWS_CODE(parseInsertCommand(ns.db(), cmd), UserException, ErrorCodes::InvalidLength); +} + +TEST(CommandWriteOpsParsers, RealMultiInsert) { + const auto ns = NamespaceString("test", "foo"); + const BSONObj obj0 = BSON("x" << 0); + const BSONObj obj1 = BSON("x" << 1); + auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj0 << obj1)); + const auto op = parseInsertCommand(ns.db(), cmd); + ASSERT_EQ(op.ns.ns(), ns.ns()); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT(!op.continueOnError); + ASSERT_EQ(op.documents.size(), 2u); + ASSERT_EQ(op.documents[0], obj0); + ASSERT_EQ(op.documents[1], obj1); +} + +TEST(CommandWriteOpsParsers, Update) { + const auto ns = NamespaceString("test", "foo"); + const BSONObj query = BSON("x" << 1); + const BSONObj update = BSON("$inc" << BSON("x" << 1)); + for (bool upsert : {false, true}) { + for (bool multi : {false, true}) { + auto cmd = BSON("update" << ns.coll() << "updates" + << BSON_ARRAY(BSON("q" << query << "u" << update << "upsert" + << upsert << "multi" << multi))); + auto op = parseUpdateCommand(ns.db(), cmd); + ASSERT_EQ(op.ns.ns(), ns.ns()); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT_EQ(op.continueOnError, false); + ASSERT_EQ(op.updates.size(), 1u); + ASSERT_EQ(op.updates[0].query, query); + ASSERT_EQ(op.updates[0].update, update); + ASSERT_EQ(op.updates[0].upsert, upsert); + ASSERT_EQ(op.updates[0].multi, multi); + } + } +} + +TEST(CommandWriteOpsParsers, Remove) { + const auto ns = NamespaceString("test", "foo"); + const BSONObj query = BSON("x" << 1); + for (bool multi : {false, true}) { + auto cmd = BSON("delete" << ns.coll() << "deletes" + << BSON_ARRAY(BSON("q" << query << "limit" << (multi ? 0 : 1)))); + auto op = parseDeleteCommand(ns.db(), cmd); + ASSERT_EQ(op.ns.ns(), ns.ns()); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT_EQ(op.continueOnError, false); + ASSERT_EQ(op.deletes.size(), 1u); + ASSERT_EQ(op.deletes[0].query, query); + ASSERT_EQ(op.deletes[0].multi, multi); + } +} + +TEST(CommandWriteOpsParsers, RemoveErrorsWithBadLimit) { + // Only 1 and 0 should be accepted. + for (BSONElement limit : BSON_ARRAY(-1 << 2 << 0.5)) { + auto cmd = BSON("delete" + << "bar" + << "deletes" << BSON_ARRAY("q" << BSONObj() << "limit" << limit)); + ASSERT_THROWS_CODE( + parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse); + } +} + +namespace { +/** + * A mock DBClient that just captures the Message that is sent for legacy writes. + */ +class MyMockDBClient final : public DBClientBase { +public: + Message message; // The last message sent. + + void say(Message& toSend, bool isRetry = false, std::string* actualServer = nullptr) { + message = std::move(toSend); + } + + // The rest of these are just filling out the pure-virtual parts of the interface. + bool lazySupported() const { + return false; + } + std::string getServerAddress() const { + return ""; + } + std::string toString() const { + return ""; + } + bool call(Message& toSend, Message& response, bool assertOk, std::string* actualServer) { + invariant(!"call() not implemented"); + } + virtual int getMinWireVersion() { + return 0; + } + virtual int getMaxWireVersion() { + return 0; + } + virtual bool isFailed() const { + return false; + } + virtual bool isStillConnected() { + return true; + } + virtual double getSoTimeout() const { + return 0; + } + virtual ConnectionString::ConnectionType type() const { + return ConnectionString::MASTER; + } +}; +} // namespace + +TEST(LegacyWriteOpsParsers, SingleInsert) { + const std::string ns = "test.foo"; + const BSONObj obj = BSON("x" << 1); + for (bool continueOnError : {false, true}) { + MyMockDBClient client; + client.insert(ns, obj, continueOnError ? InsertOption_ContinueOnError : 0); + const auto op = parseLegacyInsert(client.message); + ASSERT_EQ(op.ns.ns(), ns); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT_EQ(op.continueOnError, continueOnError); + ASSERT_EQ(op.documents.size(), 1u); + ASSERT_EQ(op.documents[0], obj); + } +} + +TEST(LegacyWriteOpsParsers, EmptyMultiInsertFails) { + const std::string ns = "test.foo"; + for (bool continueOnError : {false, true}) { + MyMockDBClient client; + client.insert( + ns, std::vector{}, continueOnError ? InsertOption_ContinueOnError : 0); + ASSERT_THROWS_CODE( + parseLegacyInsert(client.message), UserException, ErrorCodes::InvalidLength); + } +} + +TEST(LegacyWriteOpsParsers, RealMultiInsert) { + const std::string ns = "test.foo"; + const BSONObj obj0 = BSON("x" << 0); + const BSONObj obj1 = BSON("x" << 1); + for (bool continueOnError : {false, true}) { + MyMockDBClient client; + client.insert(ns, {obj0, obj1}, continueOnError ? InsertOption_ContinueOnError : 0); + const auto op = parseLegacyInsert(client.message); + ASSERT_EQ(op.ns.ns(), ns); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT_EQ(op.continueOnError, continueOnError); + ASSERT_EQ(op.documents.size(), 2u); + ASSERT_EQ(op.documents[0], obj0); + ASSERT_EQ(op.documents[1], obj1); + } +} + +TEST(LegacyWriteOpsParsers, Update) { + const std::string ns = "test.foo"; + const BSONObj query = BSON("x" << 1); + const BSONObj update = BSON("$inc" << BSON("x" << 1)); + for (bool upsert : {false, true}) { + for (bool multi : {false, true}) { + MyMockDBClient client; + client.update(ns, query, update, upsert, multi); + const auto op = parseLegacyUpdate(client.message); + ASSERT_EQ(op.ns.ns(), ns); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT_EQ(op.continueOnError, false); + ASSERT_EQ(op.updates.size(), 1u); + ASSERT_EQ(op.updates[0].query, query); + ASSERT_EQ(op.updates[0].update, update); + ASSERT_EQ(op.updates[0].upsert, upsert); + ASSERT_EQ(op.updates[0].multi, multi); + } + } +} + +TEST(LegacyWriteOpsParsers, Remove) { + const std::string ns = "test.foo"; + const BSONObj query = BSON("x" << 1); + for (bool multi : {false, true}) { + MyMockDBClient client; + client.remove(ns, query, multi ? 0 : RemoveOption_JustOne); + const auto op = parseLegacyDelete(client.message); + ASSERT_EQ(op.ns.ns(), ns); + ASSERT(!op.writeConcern); + ASSERT(!op.bypassDocumentValidation); + ASSERT_EQ(op.continueOnError, false); + ASSERT_EQ(op.deletes.size(), 1u); + ASSERT_EQ(op.deletes[0].query, query); + ASSERT_EQ(op.deletes[0].multi, multi); + } +} +} -- cgit v1.2.1