diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-04-07 18:31:21 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-04-21 18:38:56 -0400 |
commit | a7a593da31a944c90d7c5f0422eeee8264eb438d (patch) | |
tree | 58bb8fed4985c08b4b7cdf11793e19bd2c5621a7 /src/mongo/db/instance.cpp | |
parent | 98ba7f26e13edbd221afca2d119e844896397752 (diff) | |
download | mongo-a7a593da31a944c90d7c5f0422eeee8264eb438d.tar.gz |
SERVER-23128 Parsers that parse legacy and command writes into uniform objects
Diffstat (limited to 'src/mongo/db/instance.cpp')
-rw-r--r-- | src/mongo/db/instance.cpp | 126 |
1 files changed, 57 insertions, 69 deletions
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index 021e2e6e6c2..c71fc2c814d 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -74,6 +74,7 @@ #include "mongo/db/ops/update_driver.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/query/find.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" @@ -655,42 +656,41 @@ void receivedKillCursors(OperationContext* txn, Message& m) { } void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - DbMessage d(m); uassertStatusOK(userAllowedWriteNS(nsString)); - int flags = d.pullInt(); - BSONObj query = d.nextJsObj(); auto client = txn->getClient(); auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), &repl::ReplClientInfo::setLastOpToSystemLastOpTime, txn); - verify(d.moreJSObjs()); - verify(query.objsize() < m.header().dataLen()); - BSONObj toupdate = d.nextJsObj(); - uassert(10055, "update object too large", toupdate.objsize() <= BSONObjMaxUserSize); - verify(toupdate.objsize() < m.header().dataLen()); - verify(query.objsize() + toupdate.objsize() < m.header().dataLen()); - bool upsert = flags & UpdateOption_Upsert; - bool multi = flags & UpdateOption_Multi; + auto updateOp = parseLegacyUpdate(m); + auto& singleUpdate = updateOp.updates[0]; + + uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize); - op.debug().query = query; + op.debug().query = singleUpdate.query; { stdx::lock_guard<Client> lk(*client); op.setNS_inlock(nsString.ns()); - op.setQuery_inlock(query); + op.setQuery_inlock(singleUpdate.query); } - Status status = - AuthorizationSession::get(client)->checkAuthForUpdate(nsString, query, toupdate, upsert); - audit::logUpdateAuthzCheck(client, nsString, query, toupdate, upsert, multi, status.code()); + Status status = AuthorizationSession::get(client)->checkAuthForUpdate( + nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); + audit::logUpdateAuthzCheck(client, + nsString, + singleUpdate.query, + singleUpdate.update, + singleUpdate.upsert, + singleUpdate.multi, + status.code()); uassertStatusOK(status); UpdateRequest request(nsString); - request.setUpsert(upsert); - request.setMulti(multi); - request.setQuery(query); - request.setUpdates(toupdate); + request.setUpsert(singleUpdate.upsert); + request.setMulti(singleUpdate.multi); + request.setQuery(singleUpdate.query); + request.setUpdates(singleUpdate.update); UpdateLifecycleImpl updateLifecycle(nsString); request.setLifecycle(&updateLifecycle); @@ -716,7 +716,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess auto collection = ctx.db()->getCollection(nsString); // The common case: no implicit collection creation - if (!upsert || collection != NULL) { + if (!singleUpdate.upsert || collection != NULL) { unique_ptr<PlanExecutor> exec = uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate)); @@ -750,7 +750,7 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess break; } catch (const WriteConflictException& dle) { op.debug().writeConflicts++; - if (multi) { + if (singleUpdate.multi) { log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting"; throw; } @@ -814,13 +814,10 @@ void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Mess } void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - DbMessage d(m); uassertStatusOK(userAllowedWriteNS(nsString)); - int flags = d.pullInt(); - bool justOne = flags & RemoveOption_JustOne; - verify(d.moreJSObjs()); - BSONObj pattern = d.nextJsObj(); + auto deleteOp = parseLegacyDelete(m); + auto& singleDelete = deleteOp.deletes[0]; auto client = txn->getClient(); auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); @@ -828,20 +825,21 @@ void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Mess &repl::ReplClientInfo::setLastOpToSystemLastOpTime, txn); - op.debug().query = pattern; + op.debug().query = singleDelete.query; { stdx::lock_guard<Client> lk(*client); - op.setQuery_inlock(pattern); + op.setQuery_inlock(singleDelete.query); op.setNS_inlock(nsString.ns()); } - Status status = AuthorizationSession::get(client)->checkAuthForDelete(nsString, pattern); - audit::logDeleteAuthzCheck(client, nsString, pattern, status.code()); + Status status = + AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query); + audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code()); uassertStatusOK(status); DeleteRequest request(nsString); - request.setQuery(pattern); - request.setMulti(!justOne); + request.setQuery(singleDelete.query); + request.setMulti(singleDelete.multi); request.setYieldPolicy(PlanExecutor::YIELD_AUTO); @@ -969,7 +967,7 @@ bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, void insertMultiSingletons(OperationContext* txn, OldClientContext& ctx, bool keepGoing, - const char* ns, + StringData ns, CurOp& op, vector<BSONObj>::iterator begin, vector<BSONObj>::iterator end) { @@ -1002,7 +1000,7 @@ void insertMultiSingletons(OperationContext* txn, void insertMultiVector(OperationContext* txn, OldClientContext& ctx, bool keepGoing, - const char* ns, + StringData ns, CurOp& op, vector<BSONObj>::iterator begin, vector<BSONObj>::iterator end) { @@ -1033,10 +1031,16 @@ void insertMultiVector(OperationContext* txn, NOINLINE_DECL void insertMulti(OperationContext* txn, OldClientContext& ctx, - bool keepGoing, - const char* ns, - vector<BSONObj>& docs, + const InsertOp& insertOp, CurOp& op) { + std::vector<BSONObj> docs; + docs.reserve(insertOp.documents.size()); + for (auto&& doc : insertOp.documents) { + // TODO don't fail yet on invalid documents. They should be treated like other errors. + BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc)); + docs.push_back(fixed.isEmpty() ? doc : std::move(fixed)); + } + vector<BSONObj>::iterator chunkBegin = docs.begin(); int64_t chunkCount = 0; int64_t chunkSize = 0; @@ -1048,29 +1052,25 @@ NOINLINE_DECL void insertMulti(OperationContext* txn, txn); for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) { - StatusWith<BSONObj> fixed = fixDocumentForInsert(*it); - uassertStatusOK(fixed.getStatus()); - if (!fixed.getValue().isEmpty()) - *it = fixed.getValue(); - } - - for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) { chunkSize += (*it).objsize(); // Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient, // but smaller chunk sizes allow yielding to other threads and lower chance of WCEs if ((++chunkCount >= internalQueryExecYieldIterations / 2) || (chunkSize >= insertVectorMaxBytes)) { if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure - insertMultiSingletons(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1); + insertMultiSingletons( + txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); else - insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, it + 1); + insertMultiVector( + txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); chunkBegin = it + 1; chunkCount = 0; chunkSize = 0; } } if (chunkBegin != docs.end()) - insertMultiVector(txn, ctx, keepGoing, ns, op, chunkBegin, docs.end()); + insertMultiVector( + txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end()); if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { // If this operation has already generated a new lastOp, don't bother setting it @@ -1159,9 +1159,7 @@ static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curO bool _receivedInsert(OperationContext* txn, const NamespaceString& nsString, - const char* ns, - vector<BSONObj>& docs, - bool keepGoing, + const InsertOp& insertOp, CurOp& op, bool checkCollection) { // CONCURRENCY TODO: is being read locked in big log sufficient here? @@ -1169,16 +1167,14 @@ bool _receivedInsert(OperationContext* txn, uassert( 10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString)); - OldClientContext ctx(txn, ns); + OldClientContext ctx(txn, insertOp.ns.ns()); if (checkCollection && !ctx.db()->getCollection(nsString)) return false; - insertMulti(txn, ctx, keepGoing, ns, docs, op); + insertMulti(txn, ctx, insertOp, op); return true; } void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - DbMessage d(m); - const char* ns = d.getns(); { stdx::lock_guard<Client>(*txn->getClient()); CurOp::get(txn)->setNS_inlock(nsString.ns()); @@ -1186,29 +1182,21 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess uassertStatusOK(userAllowedWriteNS(nsString.ns())); if (nsString.isSystemDotIndexes()) { + DbMessage d(m); insertSystemIndexes(txn, d, op); return; } - if (!d.moreJSObjs()) { - // strange. should we complain? - return; - } - - vector<BSONObj> multi; - while (d.moreJSObjs()) { - BSONObj obj = d.nextJsObj(); - multi.push_back(obj); + auto insertOp = parseLegacyInsert(m); - // Check auth for insert (also handles checking if this is an index build and checks - // for the proper privileges in that case). + for (const auto& obj : insertOp.documents) { + // Check auth for insert. Status status = AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj); audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); uassertStatusOK(status); } - const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError; { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); @@ -1216,7 +1204,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess // OldClientContext may implicitly create a database, so check existence if (dbHolder().get(txn, nsString.db()) != NULL) { - if (_receivedInsert(txn, nsString, ns, multi, keepGoing, op, true)) + if (_receivedInsert(txn, nsString, insertOp, op, true)) return; } } @@ -1225,7 +1213,7 @@ void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Mess ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X); - _receivedInsert(txn, nsString, ns, multi, keepGoing, op, false); + _receivedInsert(txn, nsString, insertOp, op, false); } // ----- BEGIN Diaglog ----- |