summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-04-07 18:31:21 -0400
committerMathias Stearn <mathias@10gen.com>2016-04-21 18:38:56 -0400
commita7a593da31a944c90d7c5f0422eeee8264eb438d (patch)
tree58bb8fed4985c08b4b7cdf11793e19bd2c5621a7 /src
parent98ba7f26e13edbd221afca2d119e844896397752 (diff)
downloadmongo-a7a593da31a944c90d7c5f0422eeee8264eb438d.tar.gz
SERVER-23128 Parsers that parse legacy and command writes into uniform objects
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/dbmessage.h2
-rw-r--r--src/mongo/db/instance.cpp126
-rw-r--r--src/mongo/db/ops/SConscript20
-rw-r--r--src/mongo/db/ops/write_ops.h85
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp275
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h54
-rw-r--r--src/mongo/db/ops/write_ops_parsers_test.cpp313
8 files changed, 806 insertions, 70 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 5c54eee62de..71e16230890 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -717,6 +717,7 @@ serveronlyLibdeps = [
"index/index_descriptor",
"matcher/expressions_mongod_only",
"ops/update_driver",
+ "ops/write_ops_parsers",
"pipeline/document_source",
"pipeline/pipeline",
"query/query",
diff --git a/src/mongo/db/dbmessage.h b/src/mongo/db/dbmessage.h
index 976e4b37de1..29629e552fd 100644
--- a/src/mongo/db/dbmessage.h
+++ b/src/mongo/db/dbmessage.h
@@ -237,7 +237,7 @@ public:
/* for insert and update msgs */
bool moreJSObjs() const {
- return _nextjsobj != 0;
+ return _nextjsobj != 0 && _nextjsobj != _theEnd;
}
BSONObj nextJsObj();
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 021e2e6e6c2..c71fc2c814d 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/ops/update_driver.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
@@ -655,42 +656,41 @@ void receivedKillCursors(OperationContext* txn, Message& m) {
}
void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- DbMessage d(m);
uassertStatusOK(userAllowedWriteNS(nsString));
- int flags = d.pullInt();
- BSONObj query = d.nextJsObj();
auto client = txn->getClient();
auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
&repl::ReplClientInfo::setLastOpToSystemLastOpTime,
txn);
- verify(d.moreJSObjs());
- verify(query.objsize() < m.header().dataLen());
- BSONObj toupdate = d.nextJsObj();
- uassert(10055, "update object too large", toupdate.objsize() <= BSONObjMaxUserSize);
- verify(toupdate.objsize() < m.header().dataLen());
- verify(query.objsize() + toupdate.objsize() < m.header().dataLen());
- bool upsert = flags & UpdateOption_Upsert;
- bool multi = flags & UpdateOption_Multi;
+ auto updateOp = parseLegacyUpdate(m);
+ auto& singleUpdate = updateOp.updates[0];
+
+ uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize);
- op.debug().query = query;
+ op.debug().query = singleUpdate.query;
{
stdx::lock_guard<Client> lk(*client);
op.setNS_inlock(nsString.ns());
- op.setQuery_inlock(query);
+ op.setQuery_inlock(singleUpdate.query);
}
- Status status =
- AuthorizationSession::get(client)->checkAuthForUpdate(nsString, query, toupdate, upsert);
- audit::logUpdateAuthzCheck(client, nsString, query, toupdate, upsert, multi, status.code());
+ Status status = AuthorizationSession::get(client)->checkAuthForUpdate(
+ nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert);
+ audit::logUpdateAuthzCheck(client,
+ nsString,
+ singleUpdate.query,
+ singleUpdate.update,
+ singleUpdate.upsert,
+ singleUpdate.multi,
+ status.code());
uassertStatusOK(status);
UpdateRequest request(nsString);
- request.setUpsert(upsert);
- request.setMulti(multi);
- request.setQuery(query);
- request.setUpdates(toupdate);
+ request.setUpsert(singleUpdate.upsert);
+ request.setMulti(singleUpdate.multi);
+ request.setQuery(singleUpdate.query);
+ request.setUpdates(singleUpdate.update);
UpdateLifecycleImpl updateLifecycle(nsString);
request.setLifecycle(&updateLifecycle);
@@ -716,7 +716,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess
auto collection = ctx.db()->getCollection(nsString);
// The common case: no implicit collection creation
- if (!upsert || collection != NULL) {
+ if (!singleUpdate.upsert || collection != NULL) {
unique_ptr<PlanExecutor> exec =
uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate));
@@ -750,7 +750,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess
break;
} catch (const WriteConflictException& dle) {
op.debug().writeConflicts++;
- if (multi) {
+ if (singleUpdate.multi) {
log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
throw;
}
@@ -814,13 +814,10 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess
}
void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- DbMessage d(m);
uassertStatusOK(userAllowedWriteNS(nsString));
- int flags = d.pullInt();
- bool justOne = flags & RemoveOption_JustOne;
- verify(d.moreJSObjs());
- BSONObj pattern = d.nextJsObj();
+ auto deleteOp = parseLegacyDelete(m);
+ auto& singleDelete = deleteOp.deletes[0];
auto client = txn->getClient();
auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
@@ -828,20 +825,21 @@ void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Mess
&repl::ReplClientInfo::setLastOpToSystemLastOpTime,
txn);
- op.debug().query = pattern;
+ op.debug().query = singleDelete.query;
{
stdx::lock_guard<Client> lk(*client);
- op.setQuery_inlock(pattern);
+ op.setQuery_inlock(singleDelete.query);
op.setNS_inlock(nsString.ns());
}
- Status status = AuthorizationSession::get(client)->checkAuthForDelete(nsString, pattern);
- audit::logDeleteAuthzCheck(client, nsString, pattern, status.code());
+ Status status =
+ AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query);
+ audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code());
uassertStatusOK(status);
DeleteRequest request(nsString);
- request.setQuery(pattern);
- request.setMulti(!justOne);
+ request.setQuery(singleDelete.query);
+ request.setMulti(singleDelete.multi);
request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
@@ -969,7 +967,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m,
void insertMultiSingletons(OperationContext* txn,
OldClientContext& ctx,
bool keepGoing,
- const char* ns,
+ StringData ns,
CurOp& op,
vector<BSONObj>::iterator begin,
vector<BSONObj>::iterator end) {
@@ -1002,7 +1000,7 @@ void insertMultiSingletons(OperationContext* txn,
void insertMultiVector(OperationContext* txn,
OldClientContext& ctx,
bool keepGoing,
- const char* ns,
+ StringData ns,
CurOp& op,
vector<BSONObj>::iterator begin,
vector<BSONObj>::iterator end) {
@@ -1033,10 +1031,16 @@ void insertMultiVector(OperationContext* txn,
NOINLINE_DECL void insertMulti(OperationContext* txn,
OldClientContext& ctx,
- bool keepGoing,
- const char* ns,
- vector<BSONObj>& docs,
+ const InsertOp& insertOp,
CurOp& op) {
+ std::vector<BSONObj> docs;
+ docs.reserve(insertOp.documents.size());
+ for (auto&& doc : insertOp.documents) {
+ // TODO don't fail yet on invalid documents. They should be treated like other errors.
+ BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc));
+ docs.push_back(fixed.isEmpty() ? doc : std::move(fixed));
+ }
+
vector<BSONObj>::iterator chunkBegin = docs.begin();
int64_t chunkCount = 0;
int64_t chunkSize = 0;
@@ -1048,29 +1052,25 @@ NOINLINE_DECL void insertMulti(OperationContext* txn,
txn);
for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) {
- StatusWith<BSONObj> fixed = fixDocumentForInsert(*it);
- uassertStatusOK(fixed.getStatus());
- if (!fixed.getValue().isEmpty())
- *it = fixed.getValue();
- }
-
- for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) {
chunkSize += (*it).objsize();
// Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient,
// but smaller chunk sizes allow yielding to other threads and lower chance of WCEs
if ((++chunkCount >= internalQueryExecYieldIterations / 2) ||
(chunkSize >= insertVectorMaxBytes)) {
if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure
- insertMultiSingletons(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1);
+ insertMultiSingletons(
+ txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
else
- insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1);
+ insertMultiVector(
+ txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
chunkBegin = it + 1;
chunkCount = 0;
chunkSize = 0;
}
}
if (chunkBegin != docs.end())
- insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, docs.end());
+ insertMultiVector(
+ txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end());
if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
// If this operation has already generated a new lastOp, don't bother setting it
@@ -1159,9 +1159,7 @@ static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curO
bool _receivedInsert(OperationContext* txn,
const NamespaceString& nsString,
- const char* ns,
- vector<BSONObj>& docs,
- bool keepGoing,
+ const InsertOp& insertOp,
CurOp& op,
bool checkCollection) {
// CONCURRENCY TODO: is being read locked in big log sufficient here?
@@ -1169,16 +1167,14 @@ bool _receivedInsert(OperationContext* txn,
uassert(
10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
- OldClientContext ctx(txn, ns);
+ OldClientContext ctx(txn, insertOp.ns.ns());
if (checkCollection && !ctx.db()->getCollection(nsString))
return false;
- insertMulti(txn, ctx, keepGoing, ns, docs, op);
+ insertMulti(txn, ctx, insertOp, op);
return true;
}
void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- DbMessage d(m);
- const char* ns = d.getns();
{
stdx::lock_guard<Client>(*txn->getClient());
CurOp::get(txn)->setNS_inlock(nsString.ns());
@@ -1186,29 +1182,21 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess
uassertStatusOK(userAllowedWriteNS(nsString.ns()));
if (nsString.isSystemDotIndexes()) {
+ DbMessage d(m);
insertSystemIndexes(txn, d, op);
return;
}
- if (!d.moreJSObjs()) {
- // strange. should we complain?
- return;
- }
-
- vector<BSONObj> multi;
- while (d.moreJSObjs()) {
- BSONObj obj = d.nextJsObj();
- multi.push_back(obj);
+ auto insertOp = parseLegacyInsert(m);
- // Check auth for insert (also handles checking if this is an index build and checks
- // for the proper privileges in that case).
+ for (const auto& obj : insertOp.documents) {
+ // Check auth for insert.
Status status =
AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
uassertStatusOK(status);
}
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
{
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
@@ -1216,7 +1204,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess
// OldClientContext may implicitly create a database, so check existence
if (dbHolder().get(txn, nsString.db()) != NULL) {
- if (_receivedInsert(txn, nsString, ns, multi, keepGoing, op, true))
+ if (_receivedInsert(txn, nsString, insertOp, op, true))
return;
}
}
@@ -1225,7 +1213,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- _receivedInsert(txn, nsString, ns, multi, keepGoing, op, false);
+ _receivedInsert(txn, nsString, insertOp, op, false);
}
// ----- BEGIN Diaglog -----
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index 779bd57740c..07d7b7dc40d 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -233,3 +233,23 @@ env.CppUnitTest(
'update_driver',
],
)
+
+env.Library(
+ target='write_ops_parsers',
+ source=[
+ 'write_ops_parsers.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/dbmessage',
+ ],
+)
+
+env.CppUnitTest(
+ target='write_ops_parsers_test',
+ source='write_ops_parsers_test.cpp',
+ LIBDEPS=[
+ 'write_ops_parsers',
+ '$BUILD_DIR/mongo/client/clientdriver',
+ ],
+)
diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h
new file mode 100644
index 00000000000..267693bc559
--- /dev/null
+++ b/src/mongo/db/ops/write_ops.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+/**
+ * The base structure for all fields that are common for all write operations.
+ *
+ * Unlike ParsedUpdate and UpdateRequest (and the Delete counterparts), types deriving from this are
+ * intended to represent entire operations that may consist of multiple sub-operations.
+ */
+struct ParsedWriteOp {
+ NamespaceString ns;
+ boost::optional<BSONObj> writeConcern;
+ bool bypassDocumentValidation = false;
+ bool continueOnError = false;
+};
+
+/**
+ * A parsed insert insert operation.
+ */
+struct InsertOp : ParsedWriteOp {
+ std::vector<BSONObj> documents;
+};
+
+/**
+ * A parsed update operation.
+ */
+struct UpdateOp : ParsedWriteOp {
+ struct SingleUpdate {
+ BSONObj query;
+ BSONObj update;
+ bool multi = false;
+ bool upsert = false;
+ };
+
+ std::vector<SingleUpdate> updates;
+};
+
+/**
+ * A parsed Delete operation.
+ */
+struct DeleteOp : ParsedWriteOp {
+ struct SingleDelete {
+ BSONObj query;
+ bool multi = true;
+ };
+
+ std::vector<SingleDelete> deletes;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
new file mode 100644
index 00000000000..8e01ee43eaa
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -0,0 +1,275 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/ops/write_ops_parsers.h"
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/dbmessage.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace {
+
+// The specified limit to the number of operations that can be included in a single write command.
+// This is an attempt to avoid a large number of errors resulting in a reply that exceeds 16MB. It
+// doesn't fully ensure that goal, but it reduces the probability of it happening. This limit should
+// not be used if the protocol changes to avoid the 16MB limit on reply size.
+const size_t kMaxWriteBatchSize = 1000;
+
+void checkTypeInArray(BSONType expectedType,
+ const BSONElement& elem,
+ const BSONElement& arrayElem) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Wrong type for " << arrayElem.fieldNameStringData() << '['
+ << elem.fieldNameStringData() << "]. Expected a "
+ << typeName(expectedType) << ", got a " << typeName(elem.type()) << '.',
+ elem.type() == expectedType);
+}
+
+void checkType(BSONType expectedType, const BSONElement& elem) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Wrong type for '" << elem.fieldNameStringData() << "'. Expected a "
+ << typeName(expectedType) << ", got a " << typeName(elem.type()) << '.',
+ elem.type() == expectedType);
+}
+
+void checkOpCountForCommand(size_t numOps) {
+ uassert(ErrorCodes::InvalidLength,
+ str::stream() << "Write batch sizes must be between 1 and " << kMaxWriteBatchSize
+ << ". Got " << numOps << " operations.",
+ numOps != 0 && numOps <= kMaxWriteBatchSize);
+}
+
+/**
+ * Parses the fields common to all write commands and sets uniqueField to the element named
+ * uniqueFieldName. The uniqueField is the only top-level field that is unique to the specific type
+ * of write command.
+ */
+void parseWriteCommand(StringData dbName,
+ const BSONObj& cmd,
+ StringData uniqueFieldName,
+ BSONElement* uniqueField,
+ ParsedWriteOp* op) {
+ // Command dispatch wouldn't get here with an empty object because the first field indicates
+ // which command to run.
+ invariant(!cmd.isEmpty());
+
+ bool haveUniqueField = false;
+ bool firstElement = true;
+ for (BSONElement field : cmd) {
+ if (firstElement) {
+ // The key is the command name and the value is the collection name
+ checkType(String, field);
+ op->ns = NamespaceString(dbName, field.valueStringData());
+ firstElement = false;
+ continue;
+ }
+
+ const StringData fieldName = field.fieldNameStringData();
+ if (fieldName == "writeConcern") {
+ checkType(Object, field);
+ op->writeConcern = field.Obj();
+ } else if (fieldName == "bypassDocumentValidation") {
+ checkType(Bool, field);
+ op->bypassDocumentValidation = field.Bool();
+ } else if (fieldName == "ordered") {
+ checkType(Bool, field);
+ op->continueOnError = !field.Bool();
+ } else if (fieldName == uniqueFieldName) {
+ haveUniqueField = true;
+ *uniqueField = field;
+ } else if (fieldName[0] != '$') {
+ std::initializer_list<StringData> ignoredFields = {"maxTimeMS", "shardVersion"};
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Unknown option to " << cmd.firstElementFieldName()
+ << " command: " << fieldName,
+ std::find(ignoredFields.begin(), ignoredFields.end(), fieldName) !=
+ ignoredFields.end());
+ }
+ }
+
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The " << uniqueFieldName << " option is required to the "
+ << cmd.firstElementFieldName() << " command.",
+ haveUniqueField);
+}
+}
+
+InsertOp parseInsertCommand(StringData dbName, const BSONObj& cmd) {
+ BSONElement documents;
+ InsertOp op;
+ parseWriteCommand(dbName, cmd, "documents", &documents, &op);
+ checkType(Array, documents);
+ for (auto doc : documents.Obj()) {
+ checkTypeInArray(Object, doc, documents);
+ op.documents.push_back(doc.Obj());
+ }
+ checkOpCountForCommand(op.documents.size());
+ return op;
+}
+
+UpdateOp parseUpdateCommand(StringData dbName, const BSONObj& cmd) {
+ BSONElement updates;
+ UpdateOp op;
+ parseWriteCommand(dbName, cmd, "updates", &updates, &op);
+ checkType(Array, updates);
+ for (auto doc : updates.Obj()) {
+ checkTypeInArray(Object, doc, updates);
+ op.updates.emplace_back();
+ auto& update = op.updates.back();
+ bool haveQ = false;
+ bool haveU = false;
+ for (auto field : doc.Obj()) {
+ const StringData fieldName = field.fieldNameStringData();
+ if (fieldName == "q") {
+ haveQ = true;
+ checkType(Object, field);
+ update.query = field.Obj();
+ } else if (fieldName == "u") {
+ haveU = true;
+ checkType(Object, field);
+ update.update = field.Obj();
+ } else if (fieldName == "multi") {
+ checkType(Bool, field);
+ update.multi = field.Bool();
+ } else if (fieldName == "upsert") {
+ checkType(Bool, field);
+ update.upsert = field.Bool();
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Unrecognized field in update operation: " << fieldName);
+ }
+ }
+
+ uassert(ErrorCodes::FailedToParse, "The 'q' field is required for all updates", haveQ);
+ uassert(ErrorCodes::FailedToParse, "The 'u' field is required for all updates", haveU);
+ }
+ checkOpCountForCommand(op.updates.size());
+ return op;
+}
+
+DeleteOp parseDeleteCommand(StringData dbName, const BSONObj& cmd) {
+ BSONElement deletes;
+ DeleteOp op;
+ parseWriteCommand(dbName, cmd, "deletes", &deletes, &op);
+ checkType(Array, deletes);
+ for (auto doc : deletes.Obj()) {
+ checkTypeInArray(Object, doc, deletes);
+ op.deletes.emplace_back();
+ auto& del = op.deletes.back(); // delete is a reserved word.
+ bool haveQ = false;
+ bool haveLimit = false;
+ for (auto field : doc.Obj()) {
+ const StringData fieldName = field.fieldNameStringData();
+ if (fieldName == "q") {
+ haveQ = true;
+ checkType(Object, field);
+ del.query = field.Obj();
+ } else if (fieldName == "limit") {
+ haveLimit = true;
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream()
+ << "The limit field in delete objects must be a number. Got a "
+ << typeName(field.type()),
+ field.isNumber());
+
+ // Using a double to avoid throwing away illegal fractional portion. Don't want to
+ // accept 0.5 here.
+ const double limit = field.numberDouble();
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "The limit field in delete objects must be 0 or 1. Got "
+ << limit,
+ limit == 0 || limit == 1);
+ del.multi = (limit == 0);
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "Unrecognized field in delete operation: " << fieldName);
+ }
+ }
+ uassert(ErrorCodes::FailedToParse, "The 'q' field is required for all deletes", haveQ);
+ uassert(
+ ErrorCodes::FailedToParse, "The 'limit' field is required for all deletes", haveLimit);
+ }
+ checkOpCountForCommand(op.deletes.size());
+ return op;
+}
+
+InsertOp parseLegacyInsert(const Message& msgRaw) {
+ DbMessage msg(msgRaw);
+
+ InsertOp op;
+ op.ns = NamespaceString(msg.getns());
+ op.continueOnError = msg.reservedField() & InsertOption_ContinueOnError;
+ uassert(ErrorCodes::InvalidLength, "Need at least one object to insert", msg.moreJSObjs());
+ while (msg.moreJSObjs()) {
+ op.documents.push_back(msg.nextJsObj());
+ }
+ // There is no limit on the number of inserts in a legacy batch.
+
+ return op;
+}
+
+UpdateOp parseLegacyUpdate(const Message& msgRaw) {
+ DbMessage msg(msgRaw);
+
+ UpdateOp op;
+ op.ns = NamespaceString(msg.getns());
+
+ // Legacy updates only allowed one update per operation. Layout is flags, query, update.
+ op.updates.emplace_back();
+ auto& singleUpdate = op.updates.back();
+ const int flags = msg.pullInt();
+ singleUpdate.upsert = flags & UpdateOption_Upsert;
+ singleUpdate.multi = flags & UpdateOption_Multi;
+ singleUpdate.query = msg.nextJsObj();
+ singleUpdate.update = msg.nextJsObj();
+
+ return op;
+}
+
+DeleteOp parseLegacyDelete(const Message& msgRaw) {
+ DbMessage msg(msgRaw);
+
+ DeleteOp op;
+ op.ns = NamespaceString(msg.getns());
+
+ // Legacy deletes only allowed one delete per operation. Layout is flags, query.
+ op.deletes.emplace_back();
+ auto& singleDelete = op.deletes.back();
+ const int flags = msg.pullInt();
+ singleDelete.multi = !(flags & RemoveOption_JustOne);
+ singleDelete.query = msg.nextJsObj();
+
+ return op;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
new file mode 100644
index 00000000000..526703fadd3
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/jsobj.h"
+#include "mongo/util/net/message.h"
+#include "mongo/db/ops/write_ops.h"
+
+namespace mongo {
+
+/**
+ * This file contains functions to parse write operations from the user-facing wire format using
+ * either write commands or the legacy OP_INSERT/OP_UPDATE/OP_DELETE wire operations. Parse errors
+ * are reported by uasserting.
+ *
+ * These only parse and validate the operation structure. No attempt is made to parse or validate
+ * the objects to insert, or update and query operators.
+ */
+
+InsertOp parseInsertCommand(StringData dbName, const BSONObj& cmd);
+UpdateOp parseUpdateCommand(StringData dbName, const BSONObj& cmd);
+DeleteOp parseDeleteCommand(StringData dbName, const BSONObj& cmd);
+
+InsertOp parseLegacyInsert(const Message& msg);
+UpdateOp parseLegacyUpdate(const Message& msg);
+DeleteOp parseLegacyDelete(const Message& msg);
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp
new file mode 100644
index 00000000000..04b3982762e
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_parsers_test.cpp
@@ -0,0 +1,313 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+TEST(CommandWriteOpsParsers, CommonFields_WriteConcern) {
+ auto writeConcern = BSON("w" << 2);
+ auto cmd = BSON("insert"
+ << "bar"
+ << "documents" << BSON_ARRAY(BSONObj()) << "writeConcern" << writeConcern);
+ auto op = parseInsertCommand("foo", cmd);
+ ASSERT(bool(op.writeConcern));
+ ASSERT_EQ(*op.writeConcern, writeConcern);
+}
+
+TEST(CommandWriteOpsParsers, CommonFields_BypassDocumentValidation) {
+ for (bool bypassDocumentValidation : {true, false}) {
+ auto cmd = BSON("insert"
+ << "bar"
+ << "documents" << BSON_ARRAY(BSONObj()) << "bypassDocumentValidation"
+ << bypassDocumentValidation);
+ auto op = parseInsertCommand("foo", cmd);
+ ASSERT_EQ(op.bypassDocumentValidation, bypassDocumentValidation);
+ }
+}
+
+TEST(CommandWriteOpsParsers, CommonFields_Ordered) {
+ for (bool ordered : {true, false}) {
+ auto cmd = BSON("insert"
+ << "bar"
+ << "documents" << BSON_ARRAY(BSONObj()) << "ordered" << ordered);
+ auto op = parseInsertCommand("foo", cmd);
+ ASSERT_EQ(op.continueOnError, !ordered);
+ }
+}
+
+TEST(CommandWriteOpsParsers, CommonFields_IgnoredFields) {
+ // These flags are ignored, so there is nothing to check other than that this doesn't throw.
+ auto cmd = BSON("insert"
+ << "bar"
+ << "documents" << BSON_ARRAY(BSONObj()) << "maxTimeMS" << 1000 << "shardVersion"
+ << BSONObj());
+ parseInsertCommand("foo", cmd);
+}
+
+TEST(CommandWriteOpsParsers, GarbageFieldsAtTopLevel) {
+ auto cmd = BSON("insert"
+ << "bar"
+ << "documents" << BSON_ARRAY(BSONObj()) << "GARBAGE" << 1);
+ ASSERT_THROWS_CODE(parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse);
+}
+
+TEST(CommandWriteOpsParsers, GarbageFieldsInUpdateDoc) {
+ auto cmd =
+ BSON("update"
+ << "bar"
+ << "updates" << BSON_ARRAY("q" << BSONObj() << "u" << BSONObj() << "GARBAGE" << 1));
+ ASSERT_THROWS_CODE(parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse);
+}
+
+TEST(CommandWriteOpsParsers, GarbageFieldsInDeleteDoc) {
+ auto cmd = BSON("delete"
+ << "bar"
+ << "deletes" << BSON_ARRAY("q" << BSONObj() << "limit" << 0 << "GARBAGE" << 1));
+ ASSERT_THROWS_CODE(parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse);
+}
+
+TEST(CommandWriteOpsParsers, SingleInsert) {
+ const auto ns = NamespaceString("test", "foo");
+ const BSONObj obj = BSON("x" << 1);
+ auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj));
+ const auto op = parseInsertCommand(ns.db(), cmd);
+ ASSERT_EQ(op.ns.ns(), ns.ns());
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT(!op.continueOnError);
+ ASSERT_EQ(op.documents.size(), 1u);
+ ASSERT_EQ(op.documents[0], obj);
+}
+
+TEST(CommandWriteOpsParsers, EmptyMultiInsertFails) {
+ const auto ns = NamespaceString("test", "foo");
+ auto cmd = BSON("insert" << ns.coll() << "documents" << BSONArray());
+ ASSERT_THROWS_CODE(parseInsertCommand(ns.db(), cmd), UserException, ErrorCodes::InvalidLength);
+}
+
+TEST(CommandWriteOpsParsers, RealMultiInsert) {
+ const auto ns = NamespaceString("test", "foo");
+ const BSONObj obj0 = BSON("x" << 0);
+ const BSONObj obj1 = BSON("x" << 1);
+ auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj0 << obj1));
+ const auto op = parseInsertCommand(ns.db(), cmd);
+ ASSERT_EQ(op.ns.ns(), ns.ns());
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT(!op.continueOnError);
+ ASSERT_EQ(op.documents.size(), 2u);
+ ASSERT_EQ(op.documents[0], obj0);
+ ASSERT_EQ(op.documents[1], obj1);
+}
+
+TEST(CommandWriteOpsParsers, Update) {
+ const auto ns = NamespaceString("test", "foo");
+ const BSONObj query = BSON("x" << 1);
+ const BSONObj update = BSON("$inc" << BSON("x" << 1));
+ for (bool upsert : {false, true}) {
+ for (bool multi : {false, true}) {
+ auto cmd = BSON("update" << ns.coll() << "updates"
+ << BSON_ARRAY(BSON("q" << query << "u" << update << "upsert"
+ << upsert << "multi" << multi)));
+ auto op = parseUpdateCommand(ns.db(), cmd);
+ ASSERT_EQ(op.ns.ns(), ns.ns());
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT_EQ(op.continueOnError, false);
+ ASSERT_EQ(op.updates.size(), 1u);
+ ASSERT_EQ(op.updates[0].query, query);
+ ASSERT_EQ(op.updates[0].update, update);
+ ASSERT_EQ(op.updates[0].upsert, upsert);
+ ASSERT_EQ(op.updates[0].multi, multi);
+ }
+ }
+}
+
+TEST(CommandWriteOpsParsers, Remove) {
+ const auto ns = NamespaceString("test", "foo");
+ const BSONObj query = BSON("x" << 1);
+ for (bool multi : {false, true}) {
+ auto cmd = BSON("delete" << ns.coll() << "deletes"
+ << BSON_ARRAY(BSON("q" << query << "limit" << (multi ? 0 : 1))));
+ auto op = parseDeleteCommand(ns.db(), cmd);
+ ASSERT_EQ(op.ns.ns(), ns.ns());
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT_EQ(op.continueOnError, false);
+ ASSERT_EQ(op.deletes.size(), 1u);
+ ASSERT_EQ(op.deletes[0].query, query);
+ ASSERT_EQ(op.deletes[0].multi, multi);
+ }
+}
+
+TEST(CommandWriteOpsParsers, RemoveErrorsWithBadLimit) {
+ // Only 1 and 0 should be accepted.
+ for (BSONElement limit : BSON_ARRAY(-1 << 2 << 0.5)) {
+ auto cmd = BSON("delete"
+ << "bar"
+ << "deletes" << BSON_ARRAY("q" << BSONObj() << "limit" << limit));
+ ASSERT_THROWS_CODE(
+ parseInsertCommand("foo", cmd), UserException, ErrorCodes::FailedToParse);
+ }
+}
+
+namespace {
+/**
+ * A mock DBClient that just captures the Message that is sent for legacy writes.
+ */
+class MyMockDBClient final : public DBClientBase {
+public:
+ Message message; // The last message sent.
+
+ void say(Message& toSend, bool isRetry = false, std::string* actualServer = nullptr) {
+ message = std::move(toSend);
+ }
+
+ // The rest of these are just filling out the pure-virtual parts of the interface.
+ bool lazySupported() const {
+ return false;
+ }
+ std::string getServerAddress() const {
+ return "";
+ }
+ std::string toString() const {
+ return "";
+ }
+ bool call(Message& toSend, Message& response, bool assertOk, std::string* actualServer) {
+ invariant(!"call() not implemented");
+ }
+ virtual int getMinWireVersion() {
+ return 0;
+ }
+ virtual int getMaxWireVersion() {
+ return 0;
+ }
+ virtual bool isFailed() const {
+ return false;
+ }
+ virtual bool isStillConnected() {
+ return true;
+ }
+ virtual double getSoTimeout() const {
+ return 0;
+ }
+ virtual ConnectionString::ConnectionType type() const {
+ return ConnectionString::MASTER;
+ }
+};
+} // namespace
+
+TEST(LegacyWriteOpsParsers, SingleInsert) {
+ const std::string ns = "test.foo";
+ const BSONObj obj = BSON("x" << 1);
+ for (bool continueOnError : {false, true}) {
+ MyMockDBClient client;
+ client.insert(ns, obj, continueOnError ? InsertOption_ContinueOnError : 0);
+ const auto op = parseLegacyInsert(client.message);
+ ASSERT_EQ(op.ns.ns(), ns);
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT_EQ(op.continueOnError, continueOnError);
+ ASSERT_EQ(op.documents.size(), 1u);
+ ASSERT_EQ(op.documents[0], obj);
+ }
+}
+
+TEST(LegacyWriteOpsParsers, EmptyMultiInsertFails) {
+ const std::string ns = "test.foo";
+ for (bool continueOnError : {false, true}) {
+ MyMockDBClient client;
+ client.insert(
+ ns, std::vector<BSONObj>{}, continueOnError ? InsertOption_ContinueOnError : 0);
+ ASSERT_THROWS_CODE(
+ parseLegacyInsert(client.message), UserException, ErrorCodes::InvalidLength);
+ }
+}
+
+TEST(LegacyWriteOpsParsers, RealMultiInsert) {
+ const std::string ns = "test.foo";
+ const BSONObj obj0 = BSON("x" << 0);
+ const BSONObj obj1 = BSON("x" << 1);
+ for (bool continueOnError : {false, true}) {
+ MyMockDBClient client;
+ client.insert(ns, {obj0, obj1}, continueOnError ? InsertOption_ContinueOnError : 0);
+ const auto op = parseLegacyInsert(client.message);
+ ASSERT_EQ(op.ns.ns(), ns);
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT_EQ(op.continueOnError, continueOnError);
+ ASSERT_EQ(op.documents.size(), 2u);
+ ASSERT_EQ(op.documents[0], obj0);
+ ASSERT_EQ(op.documents[1], obj1);
+ }
+}
+
+TEST(LegacyWriteOpsParsers, Update) {
+ const std::string ns = "test.foo";
+ const BSONObj query = BSON("x" << 1);
+ const BSONObj update = BSON("$inc" << BSON("x" << 1));
+ for (bool upsert : {false, true}) {
+ for (bool multi : {false, true}) {
+ MyMockDBClient client;
+ client.update(ns, query, update, upsert, multi);
+ const auto op = parseLegacyUpdate(client.message);
+ ASSERT_EQ(op.ns.ns(), ns);
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT_EQ(op.continueOnError, false);
+ ASSERT_EQ(op.updates.size(), 1u);
+ ASSERT_EQ(op.updates[0].query, query);
+ ASSERT_EQ(op.updates[0].update, update);
+ ASSERT_EQ(op.updates[0].upsert, upsert);
+ ASSERT_EQ(op.updates[0].multi, multi);
+ }
+ }
+}
+
+TEST(LegacyWriteOpsParsers, Remove) {
+ const std::string ns = "test.foo";
+ const BSONObj query = BSON("x" << 1);
+ for (bool multi : {false, true}) {
+ MyMockDBClient client;
+ client.remove(ns, query, multi ? 0 : RemoveOption_JustOne);
+ const auto op = parseLegacyDelete(client.message);
+ ASSERT_EQ(op.ns.ns(), ns);
+ ASSERT(!op.writeConcern);
+ ASSERT(!op.bypassDocumentValidation);
+ ASSERT_EQ(op.continueOnError, false);
+ ASSERT_EQ(op.deletes.size(), 1u);
+ ASSERT_EQ(op.deletes[0].query, query);
+ ASSERT_EQ(op.deletes[0].multi, multi);
+ }
+}
+}