diff options
author | Benety Goh <benety@mongodb.com> | 2017-04-17 12:06:10 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-04-19 17:56:39 -0400 |
commit | 85472b2350952750b658178fc64bf80d8d357348 (patch) | |
tree | 84098124bb726398f61082618a532e665d5d4fe8 | |
parent | 4354125dd663c7b6b0ba853a70c98dc786f9385a (diff) | |
download | mongo-85472b2350952750b658178fc64bf80d8d357348.tar.gz |
SERVER-28211 add StorageInterface::deleteByFilter
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl_test.cpp | 242 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 6 |
6 files changed, 310 insertions, 3 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index ac126c0eb83..a135295f78f 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -156,6 +156,7 @@ env.Library( '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/exec/exec', '$BUILD_DIR/mongo/db/query/internal_plans', + '$BUILD_DIR/mongo/db/query/query', '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl ], ) diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 65a33a72a79..2c8a2c2635e 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -274,6 +274,8 @@ public: /** * Deletes a single document in the collection referenced by the specified _id. * Returns deleted document on success. + * + * Not supported on collections with a default collation. */ virtual StatusWith<BSONObj> deleteById(OperationContext* opCtx, const NamespaceString& nss, @@ -292,6 +294,15 @@ public: const BSONElement& idKey, const BSONObj& update) = 0; + /** + * Removes all documents that match the "filter" from a collection. + * "filter" specifies the deletion criteria using query operators. Pass in an empty document to + * delete all documents in a collection. + */ + virtual Status deleteByFilter(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& filter) = 0; + using CollectionSize = uint64_t; using CollectionCount = uint64_t; diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index d6f557f2ac4..3c2325ae78a 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -60,8 +60,10 @@ #include "mongo/db/jsobj.h" #include "mongo/db/keypattern.h" #include "mongo/db/operation_context.h" +#include "mongo/db/ops/delete_request.h" #include "mongo/db/ops/parsed_update.h" #include "mongo/db/ops/update_request.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/collection_bulk_loader_impl.h" #include "mongo/db/repl/oplog.h" @@ -748,6 +750,53 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx, MONGO_UNREACHABLE; } +Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& filter) { + DeleteRequest request(nss); + request.setQuery(filter); + request.setMulti(true); + request.setYieldPolicy(PlanExecutor::NO_YIELD); + + // This disables the legalClientSystemNS() check in getExecutorDelete() which is used to + // disallow client deletes from unrecognized system collections. + request.setGod(); + + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + // ParsedDelete needs to be inside the write conflict retry loop because it may create a + // CanonicalQuery whose ownership will be transferred to the plan executor in + // getExecutorDelete(). + ParsedDelete parsedDelete(opCtx, &request); + auto parsedDeleteStatus = parsedDelete.parseRequest(); + if (!parsedDeleteStatus.isOK()) { + return parsedDeleteStatus; + } + + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + auto collectionResult = getCollection( + autoColl, + nss, + str::stream() << "Unable to delete documents in " << nss.ns() << " using filter " + << filter); + if (!collectionResult.isOK()) { + return collectionResult.getStatus(); + } + auto collection = collectionResult.getValue(); + + auto planExecutorResult = + mongo::getExecutorDelete(opCtx, nullptr, collection, &parsedDelete); + if (!planExecutorResult.isOK()) { + return planExecutorResult.getStatus(); + } + auto planExecutor = std::move(planExecutorResult.getValue()); + + return planExecutor->executePlan(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "StorageInterfaceImpl::deleteByFilter", nss.ns()); + + MONGO_UNREACHABLE; +} + StatusWith<StorageInterface::CollectionSize> StorageInterfaceImpl::getCollectionSize( OperationContext* opCtx, const NamespaceString& nss) { AutoGetCollectionForRead autoColl(opCtx, nss); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 76bbc064e0f..adc372f23fd 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -130,6 +130,10 @@ public: const BSONElement& idKey, const BSONObj& update) override; + Status deleteByFilter(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& filter) override; + StatusWith<StorageInterface::CollectionSize> getCollectionSize( OperationContext* opCtx, const NamespaceString& nss) override; diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 7207eb5002f..534641674de 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -172,13 +172,22 @@ protected: return _opCtx.get(); } + ReplicationCoordinatorMock* getReplicationCoordinatorMock() { + return _replicationCoordinatorMock; + } + + void resetUnreplicatedWritesBlock() { + _uwb.reset(nullptr); + } + private: void setUp() override { ServiceContextMongoDTest::setUp(); _createOpCtx(); - ReplicationCoordinator::set(getServiceContext(), - stdx::make_unique<ReplicationCoordinatorMock>( - getServiceContext(), createReplSettings())); + auto replCoord = stdx::make_unique<ReplicationCoordinatorMock>(getServiceContext(), + createReplSettings()); + _replicationCoordinatorMock = replCoord.get(); + ReplicationCoordinator::set(getServiceContext(), std::move(replCoord)); } void tearDown() override { @@ -199,6 +208,7 @@ private: ServiceContext::UniqueOperationContext _opCtx; std::unique_ptr<UnreplicatedWritesBlock> _uwb; std::unique_ptr<DisableDocumentValidation> _ddv; + ReplicationCoordinatorMock* _replicationCoordinatorMock = nullptr; }; /** @@ -1706,6 +1716,232 @@ TEST_F(StorageInterfaceImplTest, ASSERT_STRING_CONTAINS(status.reason(), "Unknown modifier: $unknownUpdateOp"); } +TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsNamespaceNotFoundWhenDatabaseDoesNotExist) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("nosuchdb.coll"); + auto filter = BSON("x" << 1); + auto status = storage.deleteByFilter(opCtx, nss, filter); + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); + ASSERT_EQUALS(str::stream() << "Database [nosuchdb] not found. Unable to delete documents in " + << nss.ns() + << " using filter " + << filter, + status.reason()); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsBadValueWhenFilterContainsUnknownOperator) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto filter = BSON("x" << BSON("$unknownFilterOp" << 1)); + auto status = storage.deleteByFilter(opCtx, nss, filter); + ASSERT_EQUALS(ErrorCodes::BadValue, status); + ASSERT_STRING_CONTAINS(status.reason(), "unknown operator: $unknownFilterOp"); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsIllegalOperationOnCappedCollection) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + CollectionOptions options; + options.capped = true; + options.cappedSize = 1024 * 1024; + ASSERT_OK(storage.createCollection(opCtx, nss, options)); + + auto filter = BSON("x" << 1); + auto status = storage.deleteByFilter(opCtx, nss, filter); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, status); + ASSERT_STRING_CONTAINS(status.reason(), + str::stream() << "cannot remove from a capped collection: " << nss.ns()); +} + +TEST_F( + StorageInterfaceImplTest, + DeleteByFilterReturnsPrimarySteppedDownWhenCurrentMemberStateIsRollbackAndReplicatedWritesAreEnabled) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("mydb.mycoll"); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto doc = BSON("_id" << 0 << "x" << 0); + ASSERT_OK(storage.insertDocuments(opCtx, nss, {doc})); + _assertDocumentsInCollectionEquals(opCtx, nss, {doc}); + + // This test fixture disables replicated writes by default. We want to re-enable this setting + // for this test. + resetUnreplicatedWritesBlock(); + ASSERT_TRUE(opCtx->writesAreReplicated()); + + // deleteByFilter() checks the current member state indirectly through + // ReplicationCoordinator::canAcceptWrites() if replicated writes are enabled. + ASSERT_TRUE(getReplicationCoordinatorMock()->setFollowerMode(MemberState::RS_ROLLBACK)); + + auto filter = BSON("x" << 0); + ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, storage.deleteByFilter(opCtx, nss, filter)); +} + +TEST_F( + StorageInterfaceImplTest, + DeleteByFilterReturnsPrimarySteppedDownWhenReplicationCoordinatorCannotAcceptWritesAndReplicatedWritesAreEnabled) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("mydb.mycoll"); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto doc = BSON("_id" << 0 << "x" << 0); + ASSERT_OK(storage.insertDocuments(opCtx, nss, {doc})); + _assertDocumentsInCollectionEquals(opCtx, nss, {doc}); + + // This test fixture disables replicated writes by default. We want to re-enable this setting + // for this test. + resetUnreplicatedWritesBlock(); + ASSERT_TRUE(opCtx->writesAreReplicated()); + + // deleteByFilter() checks ReplicationCoordinator::canAcceptWritesFor() if replicated writes are + // enabled on the OperationContext. + getReplicationCoordinatorMock()->alwaysAllowWrites(false); + + auto filter = BSON("x" << 0); + ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, storage.deleteByFilter(opCtx, nss, filter)); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsNamespaceNotFoundWhenCollectionDoesNotExist) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("mydb.coll"); + NamespaceString wrongColl(nss.db(), "wrongColl"_sd); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + auto filter = BSON("x" << 1); + auto status = storage.deleteByFilter(opCtx, wrongColl, filter); + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); + ASSERT_EQUALS( + str::stream() << "Collection [mydb.wrongColl] not found. Unable to delete documents in " + << wrongColl.ns() + << " using filter " + << filter, + status.reason()); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsSuccessIfCollectionIsEmpty) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + ASSERT_OK(storage.deleteByFilter(opCtx, nss, {})); + + _assertDocumentsInCollectionEquals(opCtx, nss, {}); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterLeavesCollectionUnchangedIfNoDocumentsMatchFilter) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto docs = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 2 << "x" << 2)}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, docs)); + + auto filter = BSON("x" << 1); + ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter)); + + _assertDocumentsInCollectionEquals(opCtx, nss, docs); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterRemoveDocumentsThatMatchFilter) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto docs = {BSON("_id" << 0 << "x" << 0), + BSON("_id" << 1 << "x" << 1), + BSON("_id" << 2 << "x" << 2), + BSON("_id" << 3 << "x" << 3)}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, docs)); + + auto filter = BSON("x" << BSON("$in" << BSON_ARRAY(1 << 2))); + ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter)); + + auto docsRemaining = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 3 << "x" << 3)}; + _assertDocumentsInCollectionEquals(opCtx, nss, docsRemaining); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterUsesIdHackIfFilterContainsIdFieldOnly) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto docs = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 1 << "x" << 1)}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, docs)); + + auto filter = BSON("_id" << 1); + ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter)); + + auto docsRemaining = {BSON("_id" << 0 << "x" << 0)}; + _assertDocumentsInCollectionEquals(opCtx, nss, docsRemaining); +} + +TEST_F(StorageInterfaceImplTest, DeleteByFilterRemovesDocumentsInIllegalClientSystemNamespace) { + // Checks that we can remove documents from collections with namespaces not considered "legal + // client system" namespaces. + NamespaceString nss("local.system.rollback.docs"); + ASSERT_FALSE(legalClientSystemNS(nss.ns())); + + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions())); + + auto docs = {BSON("_id" << 0 << "x" << 0), + BSON("_id" << 1 << "x" << 1), + BSON("_id" << 2 << "x" << 2), + BSON("_id" << 3 << "x" << 3)}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, docs)); + + auto filter = BSON("$or" << BSON_ARRAY(BSON("x" << 0) << BSON("_id" << 2))); + ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter)); + + auto docsRemaining = {BSON("_id" << 1 << "x" << 1), BSON("_id" << 3 << "x" << 3)}; + _assertDocumentsInCollectionEquals(opCtx, nss, docsRemaining); +} + +TEST_F(StorageInterfaceImplTest, + DeleteByFilterRespectsCollectionsDefaultCollationWhenRemovingDocuments) { + auto opCtx = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + + // Create a collection using a case-insensitive collation. + CollectionOptions options; + options.collation = BSON("locale" + << "en_US" + << "strength" + << 2); + ASSERT_OK(storage.createCollection(opCtx, nss, options)); + + auto doc1 = BSON("_id" << 1 << "x" + << "ABC"); + auto doc2 = BSON("_id" << 2 << "x" + << "abc"); + auto doc3 = BSON("_id" << 3 << "x" + << "DEF"); + auto doc4 = BSON("_id" << 4 << "x" + << "def"); + ASSERT_OK(storage.insertDocuments(opCtx, nss, {doc1, doc2, doc3, doc4})); + + // This filter should remove doc1 and doc2 because the values of the field "x" + // are equivalent to "aBc" under the case-insensive collation. + auto filter = BSON("x" + << "aBc"); + ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter)); + + _assertDocumentsInCollectionEquals(opCtx, nss, {doc3, doc4}); +} + TEST_F(StorageInterfaceImplTest, GetCollectionCountReturnsNamespaceNotFoundWhenDatabaseDoesNotExist) { auto opCtx = getOperationContext(); diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 95c31e6c5e5..33b25731dcb 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -218,6 +218,12 @@ public: return Status{ErrorCodes::IllegalOperation, "upsertbyId not implemented."}; } + Status deleteByFilter(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& filter) override { + return Status{ErrorCodes::IllegalOperation, "deleteByFilter not implemented."}; + } + StatusWith<StorageInterface::CollectionSize> getCollectionSize( OperationContext* opCtx, const NamespaceString& nss) override { return 0; |