summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-04-17 12:06:10 -0400
committerBenety Goh <benety@mongodb.com>2017-04-19 17:56:39 -0400
commit85472b2350952750b658178fc64bf80d8d357348 (patch)
tree84098124bb726398f61082618a532e665d5d4fe8
parent4354125dd663c7b6b0ba853a70c98dc786f9385a (diff)
downloadmongo-85472b2350952750b658178fc64bf80d8d357348.tar.gz
SERVER-28211 add StorageInterface::deleteByFilter
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/storage_interface.h11
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp49
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h4
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp242
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h6
6 files changed, 310 insertions, 3 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index ac126c0eb83..a135295f78f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -156,6 +156,7 @@ env.Library(
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/exec/exec',
'$BUILD_DIR/mongo/db/query/internal_plans',
+ '$BUILD_DIR/mongo/db/query/query',
'$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl
],
)
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 65a33a72a79..2c8a2c2635e 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -274,6 +274,8 @@ public:
/**
* Deletes a single document in the collection referenced by the specified _id.
* Returns deleted document on success.
+ *
+ * Not supported on collections with a default collation.
*/
virtual StatusWith<BSONObj> deleteById(OperationContext* opCtx,
const NamespaceString& nss,
@@ -292,6 +294,15 @@ public:
const BSONElement& idKey,
const BSONObj& update) = 0;
+ /**
+ * Removes all documents that match the "filter" from a collection.
+ * "filter" specifies the deletion criteria using query operators. Pass in an empty document to
+ * delete all documents in a collection.
+ */
+ virtual Status deleteByFilter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& filter) = 0;
+
using CollectionSize = uint64_t;
using CollectionCount = uint64_t;
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index d6f557f2ac4..3c2325ae78a 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -60,8 +60,10 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/delete_request.h"
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/collection_bulk_loader_impl.h"
#include "mongo/db/repl/oplog.h"
@@ -748,6 +750,53 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
+Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& filter) {
+ DeleteRequest request(nss);
+ request.setQuery(filter);
+ request.setMulti(true);
+ request.setYieldPolicy(PlanExecutor::NO_YIELD);
+
+ // This disables the legalClientSystemNS() check in getExecutorDelete() which is used to
+ // disallow client deletes from unrecognized system collections.
+ request.setGod();
+
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ // ParsedDelete needs to be inside the write conflict retry loop because it may create a
+ // CanonicalQuery whose ownership will be transferred to the plan executor in
+ // getExecutorDelete().
+ ParsedDelete parsedDelete(opCtx, &request);
+ auto parsedDeleteStatus = parsedDelete.parseRequest();
+ if (!parsedDeleteStatus.isOK()) {
+ return parsedDeleteStatus;
+ }
+
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ auto collectionResult = getCollection(
+ autoColl,
+ nss,
+ str::stream() << "Unable to delete documents in " << nss.ns() << " using filter "
+ << filter);
+ if (!collectionResult.isOK()) {
+ return collectionResult.getStatus();
+ }
+ auto collection = collectionResult.getValue();
+
+ auto planExecutorResult =
+ mongo::getExecutorDelete(opCtx, nullptr, collection, &parsedDelete);
+ if (!planExecutorResult.isOK()) {
+ return planExecutorResult.getStatus();
+ }
+ auto planExecutor = std::move(planExecutorResult.getValue());
+
+ return planExecutor->executePlan();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "StorageInterfaceImpl::deleteByFilter", nss.ns());
+
+ MONGO_UNREACHABLE;
+}
+
StatusWith<StorageInterface::CollectionSize> StorageInterfaceImpl::getCollectionSize(
OperationContext* opCtx, const NamespaceString& nss) {
AutoGetCollectionForRead autoColl(opCtx, nss);
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 76bbc064e0f..adc372f23fd 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -130,6 +130,10 @@ public:
const BSONElement& idKey,
const BSONObj& update) override;
+ Status deleteByFilter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& filter) override;
+
StatusWith<StorageInterface::CollectionSize> getCollectionSize(
OperationContext* opCtx, const NamespaceString& nss) override;
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 7207eb5002f..534641674de 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -172,13 +172,22 @@ protected:
return _opCtx.get();
}
+ ReplicationCoordinatorMock* getReplicationCoordinatorMock() {
+ return _replicationCoordinatorMock;
+ }
+
+ void resetUnreplicatedWritesBlock() {
+ _uwb.reset(nullptr);
+ }
+
private:
void setUp() override {
ServiceContextMongoDTest::setUp();
_createOpCtx();
- ReplicationCoordinator::set(getServiceContext(),
- stdx::make_unique<ReplicationCoordinatorMock>(
- getServiceContext(), createReplSettings()));
+ auto replCoord = stdx::make_unique<ReplicationCoordinatorMock>(getServiceContext(),
+ createReplSettings());
+ _replicationCoordinatorMock = replCoord.get();
+ ReplicationCoordinator::set(getServiceContext(), std::move(replCoord));
}
void tearDown() override {
@@ -199,6 +208,7 @@ private:
ServiceContext::UniqueOperationContext _opCtx;
std::unique_ptr<UnreplicatedWritesBlock> _uwb;
std::unique_ptr<DisableDocumentValidation> _ddv;
+ ReplicationCoordinatorMock* _replicationCoordinatorMock = nullptr;
};
/**
@@ -1706,6 +1716,232 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_STRING_CONTAINS(status.reason(), "Unknown modifier: $unknownUpdateOp");
}
+TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsNamespaceNotFoundWhenDatabaseDoesNotExist) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("nosuchdb.coll");
+ auto filter = BSON("x" << 1);
+ auto status = storage.deleteByFilter(opCtx, nss, filter);
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
+ ASSERT_EQUALS(str::stream() << "Database [nosuchdb] not found. Unable to delete documents in "
+ << nss.ns()
+ << " using filter "
+ << filter,
+ status.reason());
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsBadValueWhenFilterContainsUnknownOperator) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto filter = BSON("x" << BSON("$unknownFilterOp" << 1));
+ auto status = storage.deleteByFilter(opCtx, nss, filter);
+ ASSERT_EQUALS(ErrorCodes::BadValue, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "unknown operator: $unknownFilterOp");
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsIllegalOperationOnCappedCollection) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = 1024 * 1024;
+ ASSERT_OK(storage.createCollection(opCtx, nss, options));
+
+ auto filter = BSON("x" << 1);
+ auto status = storage.deleteByFilter(opCtx, nss, filter);
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, status);
+ ASSERT_STRING_CONTAINS(status.reason(),
+ str::stream() << "cannot remove from a capped collection: " << nss.ns());
+}
+
+TEST_F(
+ StorageInterfaceImplTest,
+ DeleteByFilterReturnsPrimarySteppedDownWhenCurrentMemberStateIsRollbackAndReplicatedWritesAreEnabled) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("mydb.mycoll");
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto doc = BSON("_id" << 0 << "x" << 0);
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, {doc}));
+ _assertDocumentsInCollectionEquals(opCtx, nss, {doc});
+
+ // This test fixture disables replicated writes by default. We want to re-enable this setting
+ // for this test.
+ resetUnreplicatedWritesBlock();
+ ASSERT_TRUE(opCtx->writesAreReplicated());
+
+ // deleteByFilter() checks the current member state indirectly through
+ // ReplicationCoordinator::canAcceptWrites() if replicated writes are enabled.
+ ASSERT_TRUE(getReplicationCoordinatorMock()->setFollowerMode(MemberState::RS_ROLLBACK));
+
+ auto filter = BSON("x" << 0);
+ ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, storage.deleteByFilter(opCtx, nss, filter));
+}
+
+TEST_F(
+ StorageInterfaceImplTest,
+ DeleteByFilterReturnsPrimarySteppedDownWhenReplicationCoordinatorCannotAcceptWritesAndReplicatedWritesAreEnabled) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("mydb.mycoll");
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto doc = BSON("_id" << 0 << "x" << 0);
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, {doc}));
+ _assertDocumentsInCollectionEquals(opCtx, nss, {doc});
+
+ // This test fixture disables replicated writes by default. We want to re-enable this setting
+ // for this test.
+ resetUnreplicatedWritesBlock();
+ ASSERT_TRUE(opCtx->writesAreReplicated());
+
+ // deleteByFilter() checks ReplicationCoordinator::canAcceptWritesFor() if replicated writes are
+ // enabled on the OperationContext.
+ getReplicationCoordinatorMock()->alwaysAllowWrites(false);
+
+ auto filter = BSON("x" << 0);
+ ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, storage.deleteByFilter(opCtx, nss, filter));
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsNamespaceNotFoundWhenCollectionDoesNotExist) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("mydb.coll");
+ NamespaceString wrongColl(nss.db(), "wrongColl"_sd);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+ auto filter = BSON("x" << 1);
+ auto status = storage.deleteByFilter(opCtx, wrongColl, filter);
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
+ ASSERT_EQUALS(
+ str::stream() << "Collection [mydb.wrongColl] not found. Unable to delete documents in "
+ << wrongColl.ns()
+ << " using filter "
+ << filter,
+ status.reason());
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsSuccessIfCollectionIsEmpty) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ ASSERT_OK(storage.deleteByFilter(opCtx, nss, {}));
+
+ _assertDocumentsInCollectionEquals(opCtx, nss, {});
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterLeavesCollectionUnchangedIfNoDocumentsMatchFilter) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto docs = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 2 << "x" << 2)};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, docs));
+
+ auto filter = BSON("x" << 1);
+ ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter));
+
+ _assertDocumentsInCollectionEquals(opCtx, nss, docs);
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterRemoveDocumentsThatMatchFilter) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto docs = {BSON("_id" << 0 << "x" << 0),
+ BSON("_id" << 1 << "x" << 1),
+ BSON("_id" << 2 << "x" << 2),
+ BSON("_id" << 3 << "x" << 3)};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, docs));
+
+ auto filter = BSON("x" << BSON("$in" << BSON_ARRAY(1 << 2)));
+ ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter));
+
+ auto docsRemaining = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 3 << "x" << 3)};
+ _assertDocumentsInCollectionEquals(opCtx, nss, docsRemaining);
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterUsesIdHackIfFilterContainsIdFieldOnly) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto docs = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 1 << "x" << 1)};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, docs));
+
+ auto filter = BSON("_id" << 1);
+ ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter));
+
+ auto docsRemaining = {BSON("_id" << 0 << "x" << 0)};
+ _assertDocumentsInCollectionEquals(opCtx, nss, docsRemaining);
+}
+
+TEST_F(StorageInterfaceImplTest, DeleteByFilterRemovesDocumentsInIllegalClientSystemNamespace) {
+ // Checks that we can remove documents from collections with namespaces not considered "legal
+ // client system" namespaces.
+ NamespaceString nss("local.system.rollback.docs");
+ ASSERT_FALSE(legalClientSystemNS(nss.ns()));
+
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
+
+ auto docs = {BSON("_id" << 0 << "x" << 0),
+ BSON("_id" << 1 << "x" << 1),
+ BSON("_id" << 2 << "x" << 2),
+ BSON("_id" << 3 << "x" << 3)};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, docs));
+
+ auto filter = BSON("$or" << BSON_ARRAY(BSON("x" << 0) << BSON("_id" << 2)));
+ ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter));
+
+ auto docsRemaining = {BSON("_id" << 1 << "x" << 1), BSON("_id" << 3 << "x" << 3)};
+ _assertDocumentsInCollectionEquals(opCtx, nss, docsRemaining);
+}
+
+TEST_F(StorageInterfaceImplTest,
+ DeleteByFilterRespectsCollectionsDefaultCollationWhenRemovingDocuments) {
+ auto opCtx = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+
+ // Create a collection using a case-insensitive collation.
+ CollectionOptions options;
+ options.collation = BSON("locale"
+ << "en_US"
+ << "strength"
+ << 2);
+ ASSERT_OK(storage.createCollection(opCtx, nss, options));
+
+ auto doc1 = BSON("_id" << 1 << "x"
+ << "ABC");
+ auto doc2 = BSON("_id" << 2 << "x"
+ << "abc");
+ auto doc3 = BSON("_id" << 3 << "x"
+ << "DEF");
+ auto doc4 = BSON("_id" << 4 << "x"
+ << "def");
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, {doc1, doc2, doc3, doc4}));
+
+ // This filter should remove doc1 and doc2 because the values of the field "x"
+ // are equivalent to "aBc" under the case-insensive collation.
+ auto filter = BSON("x"
+ << "aBc");
+ ASSERT_OK(storage.deleteByFilter(opCtx, nss, filter));
+
+ _assertDocumentsInCollectionEquals(opCtx, nss, {doc3, doc4});
+}
+
TEST_F(StorageInterfaceImplTest,
GetCollectionCountReturnsNamespaceNotFoundWhenDatabaseDoesNotExist) {
auto opCtx = getOperationContext();
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 95c31e6c5e5..33b25731dcb 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -218,6 +218,12 @@ public:
return Status{ErrorCodes::IllegalOperation, "upsertbyId not implemented."};
}
+ Status deleteByFilter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& filter) override {
+ return Status{ErrorCodes::IllegalOperation, "deleteByFilter not implemented."};
+ }
+
StatusWith<StorageInterface::CollectionSize> getCollectionSize(
OperationContext* opCtx, const NamespaceString& nss) override {
return 0;