summaryrefslogtreecommitdiff
path: root/src/mongo/db/instance.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-04-07 18:31:21 -0400
committerMathias Stearn <mathias@10gen.com>2016-04-21 18:38:56 -0400
commita7a593da31a944c90d7c5f0422eeee8264eb438d (patch)
tree58bb8fed4985c08b4b7cdf11793e19bd2c5621a7 /src/mongo/db/instance.cpp
parent98ba7f26e13edbd221afca2d119e844896397752 (diff)
downloadmongo-a7a593da31a944c90d7c5f0422eeee8264eb438d.tar.gz
SERVER-23128 Parsers that parse legacy and command writes into uniform objects
Diffstat (limited to 'src/mongo/db/instance.cpp')
-rw-r--r--src/mongo/db/instance.cpp126
1 files changed, 57 insertions, 69 deletions
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 021e2e6e6c2..c71fc2c814d 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/ops/update_driver.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
@@ -655,42 +656,41 @@ void receivedKillCursors(OperationContext* txn, Message& m) {
}
void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- DbMessage d(m);
uassertStatusOK(userAllowedWriteNS(nsString));
- int flags = d.pullInt();
- BSONObj query = d.nextJsObj();
auto client = txn->getClient();
auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
&repl::ReplClientInfo::setLastOpToSystemLastOpTime,
txn);
- verify(d.moreJSObjs());
- verify(query.objsize() < m.header().dataLen());
- BSONObj toupdate = d.nextJsObj();
- uassert(10055, "update object too large", toupdate.objsize() <= BSONObjMaxUserSize);
- verify(toupdate.objsize() < m.header().dataLen());
- verify(query.objsize() + toupdate.objsize() < m.header().dataLen());
- bool upsert = flags & UpdateOption_Upsert;
- bool multi = flags & UpdateOption_Multi;
+ auto updateOp = parseLegacyUpdate(m);
+ auto& singleUpdate = updateOp.updates[0];
+
+ uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize);
- op.debug().query = query;
+ op.debug().query = singleUpdate.query;
{
stdx::lock_guard<Client> lk(*client);
op.setNS_inlock(nsString.ns());
- op.setQuery_inlock(query);
+ op.setQuery_inlock(singleUpdate.query);
}
- Status status =
- AuthorizationSession::get(client)->checkAuthForUpdate(nsString, query, toupdate, upsert);
- audit::logUpdateAuthzCheck(client, nsString, query, toupdate, upsert, multi, status.code());
+ Status status = AuthorizationSession::get(client)->checkAuthForUpdate(
+ nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert);
+ audit::logUpdateAuthzCheck(client,
+ nsString,
+ singleUpdate.query,
+ singleUpdate.update,
+ singleUpdate.upsert,
+ singleUpdate.multi,
+ status.code());
uassertStatusOK(status);
UpdateRequest request(nsString);
- request.setUpsert(upsert);
- request.setMulti(multi);
- request.setQuery(query);
- request.setUpdates(toupdate);
+ request.setUpsert(singleUpdate.upsert);
+ request.setMulti(singleUpdate.multi);
+ request.setQuery(singleUpdate.query);
+ request.setUpdates(singleUpdate.update);
UpdateLifecycleImpl updateLifecycle(nsString);
request.setLifecycle(&updateLifecycle);
@@ -716,7 +716,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess
auto collection = ctx.db()->getCollection(nsString);
// The common case: no implicit collection creation
- if (!upsert || collection != NULL) {
+ if (!singleUpdate.upsert || collection != NULL) {
unique_ptr<PlanExecutor> exec =
uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate));
@@ -750,7 +750,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess
break;
} catch (const WriteConflictException& dle) {
op.debug().writeConflicts++;
- if (multi) {
+ if (singleUpdate.multi) {
log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
throw;
}
@@ -814,13 +814,10 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess
}
void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- DbMessage d(m);
uassertStatusOK(userAllowedWriteNS(nsString));
- int flags = d.pullInt();
- bool justOne = flags & RemoveOption_JustOne;
- verify(d.moreJSObjs());
- BSONObj pattern = d.nextJsObj();
+ auto deleteOp = parseLegacyDelete(m);
+ auto& singleDelete = deleteOp.deletes[0];
auto client = txn->getClient();
auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
@@ -828,20 +825,21 @@ void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Mess
&repl::ReplClientInfo::setLastOpToSystemLastOpTime,
txn);
- op.debug().query = pattern;
+ op.debug().query = singleDelete.query;
{
stdx::lock_guard<Client> lk(*client);
- op.setQuery_inlock(pattern);
+ op.setQuery_inlock(singleDelete.query);
op.setNS_inlock(nsString.ns());
}
- Status status = AuthorizationSession::get(client)->checkAuthForDelete(nsString, pattern);
- audit::logDeleteAuthzCheck(client, nsString, pattern, status.code());
+ Status status =
+ AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query);
+ audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code());
uassertStatusOK(status);
DeleteRequest request(nsString);
- request.setQuery(pattern);
- request.setMulti(!justOne);
+ request.setQuery(singleDelete.query);
+ request.setMulti(singleDelete.multi);
request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
@@ -969,7 +967,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m,
void insertMultiSingletons(OperationContext* txn,
OldClientContext& ctx,
bool keepGoing,
- const char* ns,
+ StringData ns,
CurOp& op,
vector<BSONObj>::iterator begin,
vector<BSONObj>::iterator end) {
@@ -1002,7 +1000,7 @@ void insertMultiSingletons(OperationContext* txn,
void insertMultiVector(OperationContext* txn,
OldClientContext& ctx,
bool keepGoing,
- const char* ns,
+ StringData ns,
CurOp& op,
vector<BSONObj>::iterator begin,
vector<BSONObj>::iterator end) {
@@ -1033,10 +1031,16 @@ void insertMultiVector(OperationContext* txn,
NOINLINE_DECL void insertMulti(OperationContext* txn,
OldClientContext& ctx,
- bool keepGoing,
- const char* ns,
- vector<BSONObj>& docs,
+ const InsertOp& insertOp,
CurOp& op) {
+ std::vector<BSONObj> docs;
+ docs.reserve(insertOp.documents.size());
+ for (auto&& doc : insertOp.documents) {
+ // TODO don't fail yet on invalid documents. They should be treated like other errors.
+ BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc));
+ docs.push_back(fixed.isEmpty() ? doc : std::move(fixed));
+ }
+
vector<BSONObj>::iterator chunkBegin = docs.begin();
int64_t chunkCount = 0;
int64_t chunkSize = 0;
@@ -1048,29 +1052,25 @@ NOINLINE_DECL void insertMulti(OperationContext* txn,
txn);
for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) {
- StatusWith<BSONObj> fixed = fixDocumentForInsert(*it);
- uassertStatusOK(fixed.getStatus());
- if (!fixed.getValue().isEmpty())
- *it = fixed.getValue();
- }
-
- for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) {
chunkSize += (*it).objsize();
// Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient,
// but smaller chunk sizes allow yielding to other threads and lower chance of WCEs
if ((++chunkCount >= internalQueryExecYieldIterations / 2) ||
(chunkSize >= insertVectorMaxBytes)) {
if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure
- insertMultiSingletons(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1);
+ insertMultiSingletons(
+ txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
else
- insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1);
+ insertMultiVector(
+ txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
chunkBegin = it + 1;
chunkCount = 0;
chunkSize = 0;
}
}
if (chunkBegin != docs.end())
- insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, docs.end());
+ insertMultiVector(
+ txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end());
if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
// If this operation has already generated a new lastOp, don't bother setting it
@@ -1159,9 +1159,7 @@ static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curO
bool _receivedInsert(OperationContext* txn,
const NamespaceString& nsString,
- const char* ns,
- vector<BSONObj>& docs,
- bool keepGoing,
+ const InsertOp& insertOp,
CurOp& op,
bool checkCollection) {
// CONCURRENCY TODO: is being read locked in big log sufficient here?
@@ -1169,16 +1167,14 @@ bool _receivedInsert(OperationContext* txn,
uassert(
10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
- OldClientContext ctx(txn, ns);
+ OldClientContext ctx(txn, insertOp.ns.ns());
if (checkCollection && !ctx.db()->getCollection(nsString))
return false;
- insertMulti(txn, ctx, keepGoing, ns, docs, op);
+ insertMulti(txn, ctx, insertOp, op);
return true;
}
void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- DbMessage d(m);
- const char* ns = d.getns();
{
stdx::lock_guard<Client>(*txn->getClient());
CurOp::get(txn)->setNS_inlock(nsString.ns());
@@ -1186,29 +1182,21 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess
uassertStatusOK(userAllowedWriteNS(nsString.ns()));
if (nsString.isSystemDotIndexes()) {
+ DbMessage d(m);
insertSystemIndexes(txn, d, op);
return;
}
- if (!d.moreJSObjs()) {
- // strange. should we complain?
- return;
- }
-
- vector<BSONObj> multi;
- while (d.moreJSObjs()) {
- BSONObj obj = d.nextJsObj();
- multi.push_back(obj);
+ auto insertOp = parseLegacyInsert(m);
- // Check auth for insert (also handles checking if this is an index build and checks
- // for the proper privileges in that case).
+ for (const auto& obj : insertOp.documents) {
+ // Check auth for insert.
Status status =
AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
uassertStatusOK(status);
}
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
{
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
@@ -1216,7 +1204,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess
// OldClientContext may implicitly create a database, so check existence
if (dbHolder().get(txn, nsString.db()) != NULL) {
- if (_receivedInsert(txn, nsString, ns, multi, keepGoing, op, true))
+ if (_receivedInsert(txn, nsString, insertOp, op, true))
return;
}
}
@@ -1225,7 +1213,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- _receivedInsert(txn, nsString, ns, multi, keepGoing, op, false);
+ _receivedInsert(txn, nsString, insertOp, op, false);
}
// ----- BEGIN Diaglog -----