diff options
Diffstat (limited to 'src/mongo/dbtests/query_stage_update.cpp')
-rw-r--r-- | src/mongo/dbtests/query_stage_update.cpp | 955 |
1 files changed, 478 insertions, 477 deletions
diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp index b40bc84a918..c4e33eb6802 100644 --- a/src/mongo/dbtests/query_stage_update.cpp +++ b/src/mongo/dbtests/query_stage_update.cpp @@ -53,552 +53,553 @@ namespace QueryStageUpdate { - using std::unique_ptr; - using std::vector; - - class QueryStageUpdateBase { - public: - QueryStageUpdateBase() - : _client(&_txn), - _ns("unittests.QueryStageUpdate"), - _nsString(StringData(ns())) { - OldClientWriteContext ctx(&_txn, ns()); - _client.dropCollection(ns()); - _client.createCollection(ns()); - } - - virtual ~QueryStageUpdateBase() { - OldClientWriteContext ctx(&_txn, ns()); - _client.dropCollection(ns()); - } - - void insert(const BSONObj& doc) { - _client.insert(ns(), doc); - } +using std::unique_ptr; +using std::vector; + +class QueryStageUpdateBase { +public: + QueryStageUpdateBase() + : _client(&_txn), _ns("unittests.QueryStageUpdate"), _nsString(StringData(ns())) { + OldClientWriteContext ctx(&_txn, ns()); + _client.dropCollection(ns()); + _client.createCollection(ns()); + } + + virtual ~QueryStageUpdateBase() { + OldClientWriteContext ctx(&_txn, ns()); + _client.dropCollection(ns()); + } + + void insert(const BSONObj& doc) { + _client.insert(ns(), doc); + } + + void remove(const BSONObj& obj) { + _client.remove(ns(), obj); + } + + size_t count(const BSONObj& query) { + return _client.count(ns(), query, 0, 0, 0); + } + + CanonicalQuery* canonicalize(const BSONObj& query) { + CanonicalQuery* cq; + Status status = CanonicalQuery::canonicalize(ns(), query, &cq); + ASSERT_OK(status); + return cq; + } - void remove(const BSONObj& obj) { - _client.remove(ns(), obj); + /** + * Runs the update operation by calling work until EOF. Asserts that + * the update stage always returns NEED_TIME. + */ + void runUpdate(UpdateStage* updateStage) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = PlanStage::NEED_TIME; + while (PlanStage::IS_EOF != state) { + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + state = updateStage->work(&id); } + } - size_t count(const BSONObj& query) { - return _client.count(ns(), query, 0, 0, 0); - } + /** + * Returns a vector of all of the documents currently in 'collection'. + * + * Uses a forward collection scan stage to get the docs, and populates 'out' with + * the results. + */ + void getCollContents(Collection* collection, vector<BSONObj>* out) { + WorkingSet ws; - CanonicalQuery* canonicalize(const BSONObj& query) { - CanonicalQuery* cq; - Status status = CanonicalQuery::canonicalize(ns(), query, &cq); - ASSERT_OK(status); - return cq; - } + CollectionScanParams params; + params.collection = collection; + params.direction = CollectionScanParams::FORWARD; + params.tailable = false; - /** - * Runs the update operation by calling work until EOF. Asserts that - * the update stage always returns NEED_TIME. - */ - void runUpdate(UpdateStage* updateStage) { + unique_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); + while (!scan->isEOF()) { WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = PlanStage::NEED_TIME; - while (PlanStage::IS_EOF != state) { - ASSERT_EQUALS(PlanStage::NEED_TIME, state); - state = updateStage->work(&id); + PlanStage::StageState state = scan->work(&id); + if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = ws.get(id); + verify(member->hasObj()); + out->push_back(member->obj.value()); } } + } - /** - * Returns a vector of all of the documents currently in 'collection'. - * - * Uses a forward collection scan stage to get the docs, and populates 'out' with - * the results. - */ - void getCollContents(Collection* collection, vector<BSONObj>* out) { - WorkingSet ws; - - CollectionScanParams params; - params.collection = collection; - params.direction = CollectionScanParams::FORWARD; - params.tailable = false; - - unique_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); - while (!scan->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = scan->work(&id); - if (PlanStage::ADVANCED == state) { - WorkingSetMember* member = ws.get(id); - verify(member->hasObj()); - out->push_back(member->obj.value()); - } - } - } - - void getLocs(Collection* collection, - CollectionScanParams::Direction direction, - vector<RecordId>* out) { - WorkingSet ws; + void getLocs(Collection* collection, + CollectionScanParams::Direction direction, + vector<RecordId>* out) { + WorkingSet ws; - CollectionScanParams params; - params.collection = collection; - params.direction = direction; - params.tailable = false; - - unique_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); - while (!scan->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = scan->work(&id); - if (PlanStage::ADVANCED == state) { - WorkingSetMember* member = ws.get(id); - verify(member->hasLoc()); - out->push_back(member->loc); - } - } - } + CollectionScanParams params; + params.collection = collection; + params.direction = direction; + params.tailable = false; - /** - * Asserts that 'objs' contains 'expectedDoc'. - */ - void assertHasDoc(const vector<BSONObj>& objs, const BSONObj& expectedDoc) { - bool foundDoc = false; - for (size_t i = 0; i < objs.size(); i++) { - if (0 == objs[i].woCompare(expectedDoc)) { - foundDoc = true; - break; - } + unique_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); + while (!scan->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = scan->work(&id); + if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = ws.get(id); + verify(member->hasLoc()); + out->push_back(member->loc); } - ASSERT(foundDoc); } - - const char* ns() { return _ns.c_str(); } - - const NamespaceString& nsString() { return _nsString; } - - protected: - OperationContextImpl _txn; - - private: - DBDirectClient _client; - - std::string _ns; - NamespaceString _nsString; - }; + } /** - * Test an upsert into an empty collection. + * Asserts that 'objs' contains 'expectedDoc'. */ - class QueryStageUpdateUpsertEmptyColl : public QueryStageUpdateBase { - public: - void run() { - // Run the update. - { - OldClientWriteContext ctx(&_txn, ns()); - CurOp& curOp = *CurOp::get(_txn); - OpDebug* opDebug = &curOp.debug(); - UpdateDriver driver( (UpdateDriver::Options()) ); - Collection* collection = ctx.getCollection(); - - // Collection should be empty. - ASSERT_EQUALS(0U, count(BSONObj())); - - UpdateRequest request(nsString()); - UpdateLifecycleImpl updateLifecycle(false, nsString()); - request.setLifecycle(&updateLifecycle); - - // Update is the upsert {_id: 0, x: 1}, {$set: {y: 2}}. - BSONObj query = fromjson("{_id: 0, x: 1}"); - BSONObj updates = fromjson("{$set: {y: 2}}"); - - request.setUpsert(); - request.setQuery(query); - request.setUpdates(updates); - - ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); - - // Setup update params. - UpdateStageParams params(&request, &driver, opDebug); - unique_ptr<CanonicalQuery> cq(canonicalize(query)); - params.canonicalQuery = cq.get(); - - unique_ptr<WorkingSet> ws(new WorkingSet()); - unique_ptr<EOFStage> eofStage(new EOFStage()); - - unique_ptr<UpdateStage> updateStage( - new UpdateStage(&_txn, params, ws.get(), collection, eofStage.release())); - - runUpdate(updateStage.get()); - } - - // Verify the contents of the resulting collection. - { - AutoGetCollectionForRead ctx(&_txn, ns()); - Collection* collection = ctx.getCollection(); - - vector<BSONObj> objs; - getCollContents(collection, &objs); - - // Expect a single document, {_id: 0, x: 1, y: 2}. - ASSERT_EQUALS(1U, objs.size()); - ASSERT_EQUALS(objs[0], fromjson("{_id: 0, x: 1, y: 2}")); + void assertHasDoc(const vector<BSONObj>& objs, const BSONObj& expectedDoc) { + bool foundDoc = false; + for (size_t i = 0; i < objs.size(); i++) { + if (0 == objs[i].woCompare(expectedDoc)) { + foundDoc = true; + break; } } - }; - - /** - * Test receipt of an invalidation: case in which the document about to updated - * is deleted. - */ - class QueryStageUpdateSkipInvalidatedDoc : public QueryStageUpdateBase { - public: - void run() { - // Run the update. - { - OldClientWriteContext ctx(&_txn, ns()); - - // Populate the collection. - for (int i = 0; i < 10; ++i) { - insert(BSON("_id" << i << "foo" << i)); - } - ASSERT_EQUALS(10U, count(BSONObj())); - - CurOp& curOp = *CurOp::get(_txn); - OpDebug* opDebug = &curOp.debug(); - UpdateDriver driver( (UpdateDriver::Options()) ); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - - // Get the RecordIds that would be returned by an in-order scan. - vector<RecordId> locs; - getLocs(coll, CollectionScanParams::FORWARD, &locs); - - UpdateRequest request(nsString()); - UpdateLifecycleImpl updateLifecycle(false, nsString()); - request.setLifecycle(&updateLifecycle); - - // Update is a multi-update that sets 'bar' to 3 in every document - // where foo is less than 5. - BSONObj query = fromjson("{foo: {$lt: 5}}"); - BSONObj updates = fromjson("{$set: {bar: 3}}"); - - request.setMulti(); - request.setQuery(query); - request.setUpdates(updates); - - ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); - - // Configure the scan. - CollectionScanParams collScanParams; - collScanParams.collection = coll; - collScanParams.direction = CollectionScanParams::FORWARD; - collScanParams.tailable = false; - - // Configure the update. - UpdateStageParams updateParams(&request, &driver, opDebug); - unique_ptr<CanonicalQuery> cq(canonicalize(query)); - updateParams.canonicalQuery = cq.get(); - - unique_ptr<WorkingSet> ws(new WorkingSet()); - unique_ptr<CollectionScan> cs( - new CollectionScan(&_txn, collScanParams, ws.get(), cq->root())); - - unique_ptr<UpdateStage> updateStage( - new UpdateStage(&_txn, updateParams, ws.get(), coll, cs.release())); - - const UpdateStats* stats = - static_cast<const UpdateStats*>(updateStage->getSpecificStats()); - - const size_t targetDocIndex = 3; - - while (stats->nModified < targetDocIndex) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = updateStage->work(&id); - ASSERT_EQUALS(PlanStage::NEED_TIME, state); - } - - // Remove locs[targetDocIndex]; - updateStage->saveState(); - updateStage->invalidate(&_txn, locs[targetDocIndex], INVALIDATION_DELETION); - BSONObj targetDoc = coll->docFor(&_txn, locs[targetDocIndex]).value(); - ASSERT(!targetDoc.isEmpty()); - remove(targetDoc); - updateStage->restoreState(&_txn); - - // Do the remaining updates. - while (!updateStage->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = updateStage->work(&id); - ASSERT(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state); - } - - // 4 of the 5 matching documents should have been modified (one was deleted). - ASSERT_EQUALS(4U, stats->nModified); - ASSERT_EQUALS(4U, stats->nMatched); - } + ASSERT(foundDoc); + } - // Check the contents of the collection. - { - AutoGetCollectionForRead ctx(&_txn, ns()); - Collection* collection = ctx.getCollection(); + const char* ns() { + return _ns.c_str(); + } - vector<BSONObj> objs; - getCollContents(collection, &objs); + const NamespaceString& nsString() { + return _nsString; + } - // Verify that the collection now has 9 docs (one was deleted). - ASSERT_EQUALS(9U, objs.size()); +protected: + OperationContextImpl _txn; - // Make sure that the collection has certain documents. - assertHasDoc(objs, fromjson("{_id: 0, foo: 0, bar: 3}")); - assertHasDoc(objs, fromjson("{_id: 1, foo: 1, bar: 3}")); - assertHasDoc(objs, fromjson("{_id: 2, foo: 2, bar: 3}")); - assertHasDoc(objs, fromjson("{_id: 4, foo: 4, bar: 3}")); - assertHasDoc(objs, fromjson("{_id: 5, foo: 5}")); - assertHasDoc(objs, fromjson("{_id: 6, foo: 6}")); - } - } - }; +private: + DBDirectClient _client; - /** - * Test that the update stage returns an owned copy of the original document if - * ReturnDocOption::RETURN_OLD is specified. - */ - class QueryStageUpdateReturnOldDoc : public QueryStageUpdateBase { - public: - void run() { - // Populate the collection. - for (int i = 0; i < 10; ++i) { - insert(BSON("_id" << i << "foo" << i)); - } - ASSERT_EQUALS(10U, count(BSONObj())); + std::string _ns; + NamespaceString _nsString; +}; - // Various variables we'll need. +/** + * Test an upsert into an empty collection. + */ +class QueryStageUpdateUpsertEmptyColl : public QueryStageUpdateBase { +public: + void run() { + // Run the update. + { OldClientWriteContext ctx(&_txn, ns()); - OpDebug* opDebug = &CurOp::get(_txn)->debug(); - Collection* coll = ctx.getCollection(); - UpdateLifecycleImpl updateLifecycle(false, nsString()); - UpdateRequest request(nsString()); - UpdateDriver driver( (UpdateDriver::Options()) ); - const int targetDocIndex = 0; // We'll be working with the first doc in the collection. - const BSONObj query = BSON("foo" << BSON("$gte" << targetDocIndex)); - const std::unique_ptr<WorkingSet> ws(stdx::make_unique<WorkingSet>()); - const std::unique_ptr<CanonicalQuery> cq(canonicalize(query)); + CurOp& curOp = *CurOp::get(_txn); + OpDebug* opDebug = &curOp.debug(); + UpdateDriver driver((UpdateDriver::Options())); + Collection* collection = ctx.getCollection(); - // Get the RecordIds that would be returned by an in-order scan. - vector<RecordId> locs; - getLocs(coll, CollectionScanParams::FORWARD, &locs); + // Collection should be empty. + ASSERT_EQUALS(0U, count(BSONObj())); - // Populate the request. - request.setQuery(query); - request.setUpdates(fromjson("{$set: {x: 0}}")); - request.setSort(BSONObj()); - request.setMulti(false); - request.setReturnDocs(UpdateRequest::RETURN_OLD); + UpdateRequest request(nsString()); + UpdateLifecycleImpl updateLifecycle(false, nsString()); request.setLifecycle(&updateLifecycle); - ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + // Update is the upsert {_id: 0, x: 1}, {$set: {y: 2}}. + BSONObj query = fromjson("{_id: 0, x: 1}"); + BSONObj updates = fromjson("{$set: {y: 2}}"); - // Configure a QueuedDataStage to pass the first object in the collection back in a - // LOC_AND_UNOWNED_OBJ state. - std::unique_ptr<QueuedDataStage> qds(stdx::make_unique<QueuedDataStage>(ws.get())); - WorkingSetMember member; - member.loc = locs[targetDocIndex]; - member.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - const BSONObj oldDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex); - member.obj = Snapshotted<BSONObj>(SnapshotId(), oldDoc); - qds->pushBack(member); + request.setUpsert(); + request.setQuery(query); + request.setUpdates(updates); - // Configure the update. - UpdateStageParams updateParams(&request, &driver, opDebug); - updateParams.canonicalQuery = cq.get(); + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); - const std::unique_ptr<UpdateStage> updateStage( - stdx::make_unique<UpdateStage>(&_txn, updateParams, ws.get(), coll, qds.release())); + // Setup update params. + UpdateStageParams params(&request, &driver, opDebug); + unique_ptr<CanonicalQuery> cq(canonicalize(query)); + params.canonicalQuery = cq.get(); - // Should return advanced. - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = updateStage->work(&id); - ASSERT_EQUALS(PlanStage::ADVANCED, state); + unique_ptr<WorkingSet> ws(new WorkingSet()); + unique_ptr<EOFStage> eofStage(new EOFStage()); - // Make sure the returned value is what we expect it to be. + unique_ptr<UpdateStage> updateStage( + new UpdateStage(&_txn, params, ws.get(), collection, eofStage.release())); - // Should give us back a valid id. - ASSERT_TRUE(WorkingSet::INVALID_ID != id); - WorkingSetMember* resultMember = ws->get(id); - // With an owned copy of the object, with no RecordId. - ASSERT_TRUE(resultMember->hasOwnedObj()); - ASSERT_FALSE(resultMember->hasLoc()); - ASSERT_EQUALS(resultMember->state, WorkingSetMember::OWNED_OBJ); - ASSERT_TRUE(resultMember->obj.value().isOwned()); + runUpdate(updateStage.get()); + } - // Should be the old value. - ASSERT_EQUALS(resultMember->obj.value(), oldDoc); + // Verify the contents of the resulting collection. + { + AutoGetCollectionForRead ctx(&_txn, ns()); + Collection* collection = ctx.getCollection(); - // Should have done the update. - BSONObj newDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex << "x" << 0); vector<BSONObj> objs; - getCollContents(coll, &objs); - ASSERT_EQUALS(objs[targetDocIndex], newDoc); + getCollContents(collection, &objs); - // That should be it. - id = WorkingSet::INVALID_ID; - ASSERT_EQUALS(PlanStage::IS_EOF, updateStage->work(&id)); + // Expect a single document, {_id: 0, x: 1, y: 2}. + ASSERT_EQUALS(1U, objs.size()); + ASSERT_EQUALS(objs[0], fromjson("{_id: 0, x: 1, y: 2}")); } - }; + } +}; + +/** + * Test receipt of an invalidation: case in which the document about to updated + * is deleted. + */ +class QueryStageUpdateSkipInvalidatedDoc : public QueryStageUpdateBase { +public: + void run() { + // Run the update. + { + OldClientWriteContext ctx(&_txn, ns()); - /** - * Test that the update stage returns an owned copy of the updated document if - * ReturnDocOption::RETURN_NEW is specified. - */ - class QueryStageUpdateReturnNewDoc : public QueryStageUpdateBase { - public: - void run() { // Populate the collection. - for (int i = 0; i < 50; ++i) { + for (int i = 0; i < 10; ++i) { insert(BSON("_id" << i << "foo" << i)); } - ASSERT_EQUALS(50U, count(BSONObj())); + ASSERT_EQUALS(10U, count(BSONObj())); - // Various variables we'll need. - OldClientWriteContext ctx(&_txn, ns()); - OpDebug* opDebug = &CurOp::get(_txn)->debug(); - Collection* coll = ctx.getCollection(); - UpdateLifecycleImpl updateLifecycle(false, nsString()); - UpdateRequest request(nsString()); - UpdateDriver driver( (UpdateDriver::Options()) ); - const int targetDocIndex = 10; - const BSONObj query = BSON("foo" << BSON("$gte" << targetDocIndex)); - const std::unique_ptr<WorkingSet> ws(stdx::make_unique<WorkingSet>()); - const std::unique_ptr<CanonicalQuery> cq(canonicalize(query)); + CurOp& curOp = *CurOp::get(_txn); + OpDebug* opDebug = &curOp.debug(); + UpdateDriver driver((UpdateDriver::Options())); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); // Get the RecordIds that would be returned by an in-order scan. vector<RecordId> locs; getLocs(coll, CollectionScanParams::FORWARD, &locs); - // Populate the request. - request.setQuery(query); - request.setUpdates(fromjson("{$set: {x: 0}}")); - request.setSort(BSONObj()); - request.setMulti(false); - request.setReturnDocs(UpdateRequest::RETURN_NEW); + UpdateRequest request(nsString()); + UpdateLifecycleImpl updateLifecycle(false, nsString()); request.setLifecycle(&updateLifecycle); + // Update is a multi-update that sets 'bar' to 3 in every document + // where foo is less than 5. + BSONObj query = fromjson("{foo: {$lt: 5}}"); + BSONObj updates = fromjson("{$set: {bar: 3}}"); + + request.setMulti(); + request.setQuery(query); + request.setUpdates(updates); + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); - // Configure a QueuedDataStage to pass the first object in the collection back in a - // LOC_AND_UNOWNED_OBJ state. - std::unique_ptr<QueuedDataStage> qds(stdx::make_unique<QueuedDataStage>(ws.get())); - WorkingSetMember member; - member.loc = locs[targetDocIndex]; - member.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - const BSONObj oldDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex); - member.obj = Snapshotted<BSONObj>(SnapshotId(), oldDoc); - qds->pushBack(member); + // Configure the scan. + CollectionScanParams collScanParams; + collScanParams.collection = coll; + collScanParams.direction = CollectionScanParams::FORWARD; + collScanParams.tailable = false; // Configure the update. UpdateStageParams updateParams(&request, &driver, opDebug); + unique_ptr<CanonicalQuery> cq(canonicalize(query)); updateParams.canonicalQuery = cq.get(); - std::unique_ptr<UpdateStage> updateStage( - stdx::make_unique<UpdateStage>(&_txn, updateParams, ws.get(), coll, qds.release())); - - // Should return advanced. - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = updateStage->work(&id); - ASSERT_EQUALS(PlanStage::ADVANCED, state); - - // Make sure the returned value is what we expect it to be. - - // Should give us back a valid id. - ASSERT_TRUE(WorkingSet::INVALID_ID != id); - WorkingSetMember* resultMember = ws->get(id); - // With an owned copy of the object, with no RecordId. - ASSERT_TRUE(resultMember->hasOwnedObj()); - ASSERT_FALSE(resultMember->hasLoc()); - ASSERT_EQUALS(resultMember->state, WorkingSetMember::OWNED_OBJ); - ASSERT_TRUE(resultMember->obj.value().isOwned()); + unique_ptr<WorkingSet> ws(new WorkingSet()); + unique_ptr<CollectionScan> cs( + new CollectionScan(&_txn, collScanParams, ws.get(), cq->root())); - // Should be the new value. - BSONObj newDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex << "x" << 0); - ASSERT_EQUALS(resultMember->obj.value(), newDoc); - - // Should have done the update. - vector<BSONObj> objs; - getCollContents(coll, &objs); - ASSERT_EQUALS(objs[targetDocIndex], newDoc); - - // That should be it. - id = WorkingSet::INVALID_ID; - ASSERT_EQUALS(PlanStage::IS_EOF, updateStage->work(&id)); - } - }; - - /** - * Test that the update stage does not update or return WorkingSetMembers that it gets back from - * a child in the OWNED_OBJ state. - */ - class QueryStageUpdateSkipOwnedObjects : public QueryStageUpdateBase { - public: - void run() { - // Various variables we'll need. - OldClientWriteContext ctx(&_txn, ns()); - OpDebug* opDebug = &CurOp::get(_txn)->debug(); - Collection* coll = ctx.getCollection(); - UpdateLifecycleImpl updateLifecycle(false, nsString()); - UpdateRequest request(nsString()); - UpdateDriver driver( (UpdateDriver::Options()) ); - const BSONObj query = BSONObj(); - const std::unique_ptr<WorkingSet> ws(stdx::make_unique<WorkingSet>()); - const std::unique_ptr<CanonicalQuery> cq(canonicalize(query)); + unique_ptr<UpdateStage> updateStage( + new UpdateStage(&_txn, updateParams, ws.get(), coll, cs.release())); - // Populate the request. - request.setQuery(query); - request.setUpdates(fromjson("{$set: {x: 0}}")); - request.setSort(BSONObj()); - request.setMulti(false); - request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD); - request.setLifecycle(&updateLifecycle); + const UpdateStats* stats = + static_cast<const UpdateStats*>(updateStage->getSpecificStats()); - ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + const size_t targetDocIndex = 3; - // Configure a QueuedDataStage to pass an OWNED_OBJ to the update stage. - std::unique_ptr<QueuedDataStage> qds(stdx::make_unique<QueuedDataStage>(ws.get())); - WorkingSetMember member; - member.state = WorkingSetMember::OWNED_OBJ; - member.obj = Snapshotted<BSONObj>(SnapshotId(), fromjson("{x: 1}")); - qds->pushBack(member); + while (stats->nModified < targetDocIndex) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + } - // Configure the update. - UpdateStageParams updateParams(&request, &driver, opDebug); - updateParams.canonicalQuery = cq.get(); + // Remove locs[targetDocIndex]; + updateStage->saveState(); + updateStage->invalidate(&_txn, locs[targetDocIndex], INVALIDATION_DELETION); + BSONObj targetDoc = coll->docFor(&_txn, locs[targetDocIndex]).value(); + ASSERT(!targetDoc.isEmpty()); + remove(targetDoc); + updateStage->restoreState(&_txn); - const std::unique_ptr<UpdateStage> updateStage( - stdx::make_unique<UpdateStage>(&_txn, updateParams, ws.get(), coll, qds.release())); - const UpdateStats* stats = - static_cast<const UpdateStats*>(updateStage->getSpecificStats()); + // Do the remaining updates. + while (!updateStage->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + ASSERT(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state); + } - // Call work, passing the set up member to the update stage. - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = updateStage->work(&id); + // 4 of the 5 matching documents should have been modified (one was deleted). + ASSERT_EQUALS(4U, stats->nModified); + ASSERT_EQUALS(4U, stats->nMatched); + } - // Should return NEED_TIME, not modifying anything. - ASSERT_EQUALS(PlanStage::NEED_TIME, state); - ASSERT_EQUALS(stats->nModified, 0U); + // Check the contents of the collection. + { + AutoGetCollectionForRead ctx(&_txn, ns()); + Collection* collection = ctx.getCollection(); - id = WorkingSet::INVALID_ID; - state = updateStage->work(&id); - ASSERT_EQUALS(PlanStage::IS_EOF, state); + vector<BSONObj> objs; + getCollContents(collection, &objs); + + // Verify that the collection now has 9 docs (one was deleted). + ASSERT_EQUALS(9U, objs.size()); + + // Make sure that the collection has certain documents. + assertHasDoc(objs, fromjson("{_id: 0, foo: 0, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 1, foo: 1, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 2, foo: 2, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 4, foo: 4, bar: 3}")); + assertHasDoc(objs, fromjson("{_id: 5, foo: 5}")); + assertHasDoc(objs, fromjson("{_id: 6, foo: 6}")); } - }; - - class All : public Suite { - public: - All() : Suite("query_stage_update") {} - - void setupTests() { - // Stage-specific tests below. - add<QueryStageUpdateUpsertEmptyColl>(); - add<QueryStageUpdateSkipInvalidatedDoc>(); - add<QueryStageUpdateReturnOldDoc>(); - add<QueryStageUpdateReturnNewDoc>(); - add<QueryStageUpdateSkipOwnedObjects>(); + } +}; + +/** + * Test that the update stage returns an owned copy of the original document if + * ReturnDocOption::RETURN_OLD is specified. + */ +class QueryStageUpdateReturnOldDoc : public QueryStageUpdateBase { +public: + void run() { + // Populate the collection. + for (int i = 0; i < 10; ++i) { + insert(BSON("_id" << i << "foo" << i)); } - }; + ASSERT_EQUALS(10U, count(BSONObj())); + + // Various variables we'll need. + OldClientWriteContext ctx(&_txn, ns()); + OpDebug* opDebug = &CurOp::get(_txn)->debug(); + Collection* coll = ctx.getCollection(); + UpdateLifecycleImpl updateLifecycle(false, nsString()); + UpdateRequest request(nsString()); + UpdateDriver driver((UpdateDriver::Options())); + const int targetDocIndex = 0; // We'll be working with the first doc in the collection. + const BSONObj query = BSON("foo" << BSON("$gte" << targetDocIndex)); + const std::unique_ptr<WorkingSet> ws(stdx::make_unique<WorkingSet>()); + const std::unique_ptr<CanonicalQuery> cq(canonicalize(query)); + + // Get the RecordIds that would be returned by an in-order scan. + vector<RecordId> locs; + getLocs(coll, CollectionScanParams::FORWARD, &locs); + + // Populate the request. + request.setQuery(query); + request.setUpdates(fromjson("{$set: {x: 0}}")); + request.setSort(BSONObj()); + request.setMulti(false); + request.setReturnDocs(UpdateRequest::RETURN_OLD); + request.setLifecycle(&updateLifecycle); + + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + + // Configure a QueuedDataStage to pass the first object in the collection back in a + // LOC_AND_UNOWNED_OBJ state. + std::unique_ptr<QueuedDataStage> qds(stdx::make_unique<QueuedDataStage>(ws.get())); + WorkingSetMember member; + member.loc = locs[targetDocIndex]; + member.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + const BSONObj oldDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex); + member.obj = Snapshotted<BSONObj>(SnapshotId(), oldDoc); + qds->pushBack(member); + + // Configure the update. + UpdateStageParams updateParams(&request, &driver, opDebug); + updateParams.canonicalQuery = cq.get(); + + const std::unique_ptr<UpdateStage> updateStage( + stdx::make_unique<UpdateStage>(&_txn, updateParams, ws.get(), coll, qds.release())); + + // Should return advanced. + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + + // Make sure the returned value is what we expect it to be. + + // Should give us back a valid id. + ASSERT_TRUE(WorkingSet::INVALID_ID != id); + WorkingSetMember* resultMember = ws->get(id); + // With an owned copy of the object, with no RecordId. + ASSERT_TRUE(resultMember->hasOwnedObj()); + ASSERT_FALSE(resultMember->hasLoc()); + ASSERT_EQUALS(resultMember->state, WorkingSetMember::OWNED_OBJ); + ASSERT_TRUE(resultMember->obj.value().isOwned()); + + // Should be the old value. + ASSERT_EQUALS(resultMember->obj.value(), oldDoc); + + // Should have done the update. + BSONObj newDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex << "x" << 0); + vector<BSONObj> objs; + getCollContents(coll, &objs); + ASSERT_EQUALS(objs[targetDocIndex], newDoc); + + // That should be it. + id = WorkingSet::INVALID_ID; + ASSERT_EQUALS(PlanStage::IS_EOF, updateStage->work(&id)); + } +}; - SuiteInstance<All> all; +/** + * Test that the update stage returns an owned copy of the updated document if + * ReturnDocOption::RETURN_NEW is specified. + */ +class QueryStageUpdateReturnNewDoc : public QueryStageUpdateBase { +public: + void run() { + // Populate the collection. + for (int i = 0; i < 50; ++i) { + insert(BSON("_id" << i << "foo" << i)); + } + ASSERT_EQUALS(50U, count(BSONObj())); + + // Various variables we'll need. + OldClientWriteContext ctx(&_txn, ns()); + OpDebug* opDebug = &CurOp::get(_txn)->debug(); + Collection* coll = ctx.getCollection(); + UpdateLifecycleImpl updateLifecycle(false, nsString()); + UpdateRequest request(nsString()); + UpdateDriver driver((UpdateDriver::Options())); + const int targetDocIndex = 10; + const BSONObj query = BSON("foo" << BSON("$gte" << targetDocIndex)); + const std::unique_ptr<WorkingSet> ws(stdx::make_unique<WorkingSet>()); + const std::unique_ptr<CanonicalQuery> cq(canonicalize(query)); + + // Get the RecordIds that would be returned by an in-order scan. + vector<RecordId> locs; + getLocs(coll, CollectionScanParams::FORWARD, &locs); + + // Populate the request. + request.setQuery(query); + request.setUpdates(fromjson("{$set: {x: 0}}")); + request.setSort(BSONObj()); + request.setMulti(false); + request.setReturnDocs(UpdateRequest::RETURN_NEW); + request.setLifecycle(&updateLifecycle); + + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + + // Configure a QueuedDataStage to pass the first object in the collection back in a + // LOC_AND_UNOWNED_OBJ state. + std::unique_ptr<QueuedDataStage> qds(stdx::make_unique<QueuedDataStage>(ws.get())); + WorkingSetMember member; + member.loc = locs[targetDocIndex]; + member.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + const BSONObj oldDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex); + member.obj = Snapshotted<BSONObj>(SnapshotId(), oldDoc); + qds->pushBack(member); + + // Configure the update. + UpdateStageParams updateParams(&request, &driver, opDebug); + updateParams.canonicalQuery = cq.get(); + + std::unique_ptr<UpdateStage> updateStage( + stdx::make_unique<UpdateStage>(&_txn, updateParams, ws.get(), coll, qds.release())); + + // Should return advanced. + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + ASSERT_EQUALS(PlanStage::ADVANCED, state); + + // Make sure the returned value is what we expect it to be. + + // Should give us back a valid id. + ASSERT_TRUE(WorkingSet::INVALID_ID != id); + WorkingSetMember* resultMember = ws->get(id); + // With an owned copy of the object, with no RecordId. + ASSERT_TRUE(resultMember->hasOwnedObj()); + ASSERT_FALSE(resultMember->hasLoc()); + ASSERT_EQUALS(resultMember->state, WorkingSetMember::OWNED_OBJ); + ASSERT_TRUE(resultMember->obj.value().isOwned()); + + // Should be the new value. + BSONObj newDoc = BSON("_id" << targetDocIndex << "foo" << targetDocIndex << "x" << 0); + ASSERT_EQUALS(resultMember->obj.value(), newDoc); + + // Should have done the update. + vector<BSONObj> objs; + getCollContents(coll, &objs); + ASSERT_EQUALS(objs[targetDocIndex], newDoc); + + // That should be it. + id = WorkingSet::INVALID_ID; + ASSERT_EQUALS(PlanStage::IS_EOF, updateStage->work(&id)); + } +}; -} // namespace QueryStageUpdate +/** + * Test that the update stage does not update or return WorkingSetMembers that it gets back from + * a child in the OWNED_OBJ state. + */ +class QueryStageUpdateSkipOwnedObjects : public QueryStageUpdateBase { +public: + void run() { + // Various variables we'll need. + OldClientWriteContext ctx(&_txn, ns()); + OpDebug* opDebug = &CurOp::get(_txn)->debug(); + Collection* coll = ctx.getCollection(); + UpdateLifecycleImpl updateLifecycle(false, nsString()); + UpdateRequest request(nsString()); + UpdateDriver driver((UpdateDriver::Options())); + const BSONObj query = BSONObj(); + const std::unique_ptr<WorkingSet> ws(stdx::make_unique<WorkingSet>()); + const std::unique_ptr<CanonicalQuery> cq(canonicalize(query)); + + // Populate the request. + request.setQuery(query); + request.setUpdates(fromjson("{$set: {x: 0}}")); + request.setSort(BSONObj()); + request.setMulti(false); + request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD); + request.setLifecycle(&updateLifecycle); + + ASSERT_OK(driver.parse(request.getUpdates(), request.isMulti())); + + // Configure a QueuedDataStage to pass an OWNED_OBJ to the update stage. + std::unique_ptr<QueuedDataStage> qds(stdx::make_unique<QueuedDataStage>(ws.get())); + WorkingSetMember member; + member.state = WorkingSetMember::OWNED_OBJ; + member.obj = Snapshotted<BSONObj>(SnapshotId(), fromjson("{x: 1}")); + qds->pushBack(member); + + // Configure the update. + UpdateStageParams updateParams(&request, &driver, opDebug); + updateParams.canonicalQuery = cq.get(); + + const std::unique_ptr<UpdateStage> updateStage( + stdx::make_unique<UpdateStage>(&_txn, updateParams, ws.get(), coll, qds.release())); + const UpdateStats* stats = static_cast<const UpdateStats*>(updateStage->getSpecificStats()); + + // Call work, passing the set up member to the update stage. + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = updateStage->work(&id); + + // Should return NEED_TIME, not modifying anything. + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + ASSERT_EQUALS(stats->nModified, 0U); + + id = WorkingSet::INVALID_ID; + state = updateStage->work(&id); + ASSERT_EQUALS(PlanStage::IS_EOF, state); + } +}; + +class All : public Suite { +public: + All() : Suite("query_stage_update") {} + + void setupTests() { + // Stage-specific tests below. + add<QueryStageUpdateUpsertEmptyColl>(); + add<QueryStageUpdateSkipInvalidatedDoc>(); + add<QueryStageUpdateReturnOldDoc>(); + add<QueryStageUpdateReturnNewDoc>(); + add<QueryStageUpdateSkipOwnedObjects>(); + } +}; + +SuiteInstance<All> all; + +} // namespace QueryStageUpdate |