summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-25 22:52:43 -0400
committerBenety Goh <benety@mongodb.com>2019-04-25 22:52:43 -0400
commitfe5ef2303f53773cabd01e0ce7e7cc83965fe211 (patch)
tree1ee9f47542ca6e665008e8911e171e3c83f526f6
parent9318790212dead660f8f057f18ed48fba9232b83 (diff)
downloadmongo-fe5ef2303f53773cabd01e0ce7e7cc83965fe211.tar.gz
Revert "SERVER-40005 Add translation for FindAndModify for FLE"
This reverts commit 9318790212dead660f8f057f18ed48fba9232b83.
-rw-r--r--jstests/noPassthrough/commands_handle_kill.js2
-rw-r--r--jstests/noPassthrough/readConcern_snapshot_mongos.js2
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp12
-rw-r--r--src/mongo/db/query/find_and_modify_request.cpp109
-rw-r--r--src/mongo/db/query/find_and_modify_request.h9
-rw-r--r--src/mongo/db/query/find_and_modify_request_test.cpp34
-rw-r--r--src/mongo/s/catalog/dist_lock_catalog_impl.cpp537
7 files changed, 604 insertions, 101 deletions
diff --git a/jstests/noPassthrough/commands_handle_kill.js b/jstests/noPassthrough/commands_handle_kill.js
index 3838c90425c..799cfc8cca1 100644
--- a/jstests/noPassthrough/commands_handle_kill.js
+++ b/jstests/noPassthrough/commands_handle_kill.js
@@ -188,7 +188,7 @@ if (${ canYield }) {
{distinct: collName, key: "_id", query: {a: {$gte: 0}}}, {usesIndex: true});
assertCommandPropogatesPlanExecutorKillReason(
- {findAndModify: collName, query: {fakeField: {$gt: 0}}, update: {$inc: {a: 1}}});
+ {findAndModify: collName, filter: {fakeField: {$gt: 0}}, update: {$inc: {a: 1}}});
assertCommandPropogatesPlanExecutorKillReason(
{
diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js
index 7427a5669f5..a30f3ac9aae 100644
--- a/jstests/noPassthrough/readConcern_snapshot_mongos.js
+++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js
@@ -88,7 +88,7 @@
// readConcern 'snapshot' is supported by findAndModify on mongos in a transaction.
expectSuccessInTxnThenAbort(session, sessionDb, {
findAndModify: collName,
- query: {},
+ filter: {},
update: {$set: {a: 1}},
readConcern: {level: "snapshot"},
});
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 32a160d433c..6e8efaa83b9 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -54,7 +54,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
if (opType == repl::OpTypeEnum::kDelete) {
uassert(
40606,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON({}))
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
<< " is not compatible with previous write in the transaction of type: "
<< OpType_serializer(oplogEntry.getOpType())
<< ", oplogTs: "
@@ -64,12 +64,12 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
request.isRemove());
uassert(40607,
str::stream() << "No pre-image available for findAndModify retry request:"
- << redact(request.toBSON({})),
+ << redact(request.toBSON()),
oplogWithCorrectLinks.getPreImageOpTime());
} else if (opType == repl::OpTypeEnum::kInsert) {
uassert(
40608,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON({}))
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
<< " is not compatible with previous write in the transaction of type: "
<< OpType_serializer(oplogEntry.getOpType())
<< ", oplogTs: "
@@ -80,7 +80,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
} else {
uassert(
40609,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON({}))
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
<< " is not compatible with previous write in the transaction of type: "
<< OpType_serializer(oplogEntry.getOpType())
<< ", oplogTs: "
@@ -91,7 +91,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
if (request.shouldReturnNew()) {
uassert(40611,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON({}))
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
<< " wants the document after update returned, but only before "
"update document is stored, oplogTs: "
<< ts.toString()
@@ -100,7 +100,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
oplogWithCorrectLinks.getPostImageOpTime());
} else {
uassert(40612,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON({}))
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
<< " wants the document before update returned, but only after "
"update document is stored, oplogTs: "
<< ts.toString()
diff --git a/src/mongo/db/query/find_and_modify_request.cpp b/src/mongo/db/query/find_and_modify_request.cpp
index 6c46b639177..1ef84b03762 100644
--- a/src/mongo/db/query/find_and_modify_request.cpp
+++ b/src/mongo/db/query/find_and_modify_request.cpp
@@ -34,9 +34,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/bson_extract.h"
-#include "mongo/db/command_generic_argument.h"
#include "mongo/db/write_concern.h"
-#include "mongo/idl/idl_parser.h"
namespace mongo {
@@ -54,12 +52,6 @@ const char kUpsertField[] = "upsert";
const char kWriteConcernField[] = "writeConcern";
const std::vector<BSONObj> emptyArrayFilters{};
-
-const std::vector<StringData> _knownFields{
- FindAndModifyRequest::kBypassDocumentValidationFieldName,
- FindAndModifyRequest::kLegacyCommandName,
- FindAndModifyRequest::kCommandName,
-};
} // unnamed namespace
FindAndModifyRequest::FindAndModifyRequest(NamespaceString fullNs, BSONObj query, BSONObj updateObj)
@@ -75,12 +67,12 @@ FindAndModifyRequest FindAndModifyRequest::makeUpdate(NamespaceString fullNs,
}
FindAndModifyRequest FindAndModifyRequest::makeRemove(NamespaceString fullNs, BSONObj query) {
- FindAndModifyRequest request(fullNs, query, {});
+ FindAndModifyRequest request(fullNs, query, BSONObj());
request._isRemove = true;
return request;
}
-BSONObj FindAndModifyRequest::toBSON(const BSONObj& commandPassthroughFields) const {
+BSONObj FindAndModifyRequest::toBSON() const {
BSONObjBuilder builder;
builder.append(kCmdName, _ns.coll());
@@ -124,77 +116,56 @@ BSONObj FindAndModifyRequest::toBSON(const BSONObj& commandPassthroughFields) co
builder.append(kWriteConcernField, _writeConcern->toBSON());
}
- IDLParserErrorContext::appendGenericCommandArguments(
- commandPassthroughFields, _knownFields, &builder);
-
return builder.obj();
}
StatusWith<FindAndModifyRequest> FindAndModifyRequest::parseFromBSON(NamespaceString fullNs,
const BSONObj& cmdObj) {
- BSONObj query;
- BSONObj fields;
- BSONObj updateObj;
- BSONObj sort;
+ BSONObj query = cmdObj.getObjectField(kQueryField);
+ BSONObj fields = cmdObj.getObjectField(kFieldProjectionField);
+ BSONObj updateObj = cmdObj.getObjectField(kUpdateField);
+ BSONObj sort = cmdObj.getObjectField(kSortField);
+
BSONObj collation;
- bool shouldReturnNew = false;
- bool isUpsert = false;
- bool isRemove = false;
- bool isUpdate = false;
- bool arrayFiltersSet = false;
- std::vector<BSONObj> arrayFilters;
+ {
+ BSONElement collationElt;
+ Status collationEltStatus =
+ bsonExtractTypedField(cmdObj, kCollationField, BSONType::Object, &collationElt);
+ if (!collationEltStatus.isOK() && (collationEltStatus != ErrorCodes::NoSuchKey)) {
+ return collationEltStatus;
+ }
+ if (collationEltStatus.isOK()) {
+ collation = collationElt.Obj();
+ }
+ }
- for (auto&& field : cmdObj.getFieldNames<std::set<std::string>>()) {
- if (field == kQueryField) {
- query = cmdObj.getObjectField(kQueryField);
- } else if (field == kSortField) {
- sort = cmdObj.getObjectField(kSortField);
- } else if (field == kRemoveField) {
- isRemove = cmdObj[kRemoveField].trueValue();
- } else if (field == kUpdateField) {
- updateObj = cmdObj.getObjectField(kUpdateField);
- isUpdate = true;
- } else if (field == kNewField) {
- shouldReturnNew = cmdObj[kNewField].trueValue();
- } else if (field == kFieldProjectionField) {
- fields = cmdObj.getObjectField(kFieldProjectionField);
- } else if (field == kUpsertField) {
- isUpsert = cmdObj[kUpsertField].trueValue();
- } else if (field == kCollationField) {
- BSONElement collationElt;
- Status collationEltStatus =
- bsonExtractTypedField(cmdObj, kCollationField, BSONType::Object, &collationElt);
- if (!collationEltStatus.isOK() && (collationEltStatus != ErrorCodes::NoSuchKey)) {
- return collationEltStatus;
- }
- if (collationEltStatus.isOK()) {
- collation = collationElt.Obj();
- }
- } else if (field == kArrayFiltersField) {
- BSONElement arrayFiltersElt;
- Status arrayFiltersEltStatus = bsonExtractTypedField(
- cmdObj, kArrayFiltersField, BSONType::Array, &arrayFiltersElt);
- if (!arrayFiltersEltStatus.isOK() && (arrayFiltersEltStatus != ErrorCodes::NoSuchKey)) {
- return arrayFiltersEltStatus;
- }
- if (arrayFiltersEltStatus.isOK()) {
- arrayFiltersSet = true;
- for (auto arrayFilter : arrayFiltersElt.Obj()) {
- if (arrayFilter.type() != BSONType::Object) {
- return {ErrorCodes::TypeMismatch,
- str::stream() << "Each array filter must be an object, found "
- << arrayFilter.type()};
- }
- arrayFilters.push_back(arrayFilter.Obj());
+ std::vector<BSONObj> arrayFilters;
+ bool arrayFiltersSet = false;
+ {
+ BSONElement arrayFiltersElt;
+ Status arrayFiltersEltStatus =
+ bsonExtractTypedField(cmdObj, kArrayFiltersField, BSONType::Array, &arrayFiltersElt);
+ if (!arrayFiltersEltStatus.isOK() && (arrayFiltersEltStatus != ErrorCodes::NoSuchKey)) {
+ return arrayFiltersEltStatus;
+ }
+ if (arrayFiltersEltStatus.isOK()) {
+ arrayFiltersSet = true;
+ for (auto arrayFilter : arrayFiltersElt.Obj()) {
+ if (arrayFilter.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << "Each array filter must be an object, found "
+ << arrayFilter.type()};
}
+ arrayFilters.push_back(arrayFilter.Obj());
}
- } else if (!isGenericArgument(field) &&
- !std::count(_knownFields.begin(), _knownFields.end(), field)) {
- return {ErrorCodes::Error(51177),
- str::stream() << "BSON field '" << field << "' is an unknown field."};
}
}
+ bool shouldReturnNew = cmdObj[kNewField].trueValue();
+ bool isUpsert = cmdObj[kUpsertField].trueValue();
+ bool isRemove = cmdObj[kRemoveField].trueValue();
+ bool isUpdate = cmdObj.hasField(kUpdateField);
+
if (!isRemove && !isUpdate) {
return {ErrorCodes::FailedToParse, "Either an update or remove=true must be specified"};
}
diff --git a/src/mongo/db/query/find_and_modify_request.h b/src/mongo/db/query/find_and_modify_request.h
index f0882a63941..71e1ac983d1 100644
--- a/src/mongo/db/query/find_and_modify_request.h
+++ b/src/mongo/db/query/find_and_modify_request.h
@@ -50,10 +50,6 @@ class StatusWith;
*/
class FindAndModifyRequest {
public:
- static constexpr auto kBypassDocumentValidationFieldName = "bypassDocumentValidation"_sd;
- static constexpr auto kLegacyCommandName = "findandmodify"_sd;
- static constexpr auto kCommandName = "findAndModify"_sd;
-
/**
* Creates a new instance of an 'update' type findAndModify request.
*/
@@ -91,10 +87,9 @@ public:
/**
* Serializes this object into a BSON representation. Fields that are not
- * set will not be part of the the serialized object. Passthrough fields
- * are appended.
+ * set will not be part of the the serialized object.
*/
- BSONObj toBSON(const BSONObj& commandPassthroughFields) const;
+ BSONObj toBSON() const;
const NamespaceString& getNamespaceString() const;
BSONObj getQuery() const;
diff --git a/src/mongo/db/query/find_and_modify_request_test.cpp b/src/mongo/db/query/find_and_modify_request_test.cpp
index a30bdd2e538..7266cb04a18 100644
--- a/src/mongo/db/query/find_and_modify_request_test.cpp
+++ b/src/mongo/db/query/find_and_modify_request_test.cpp
@@ -47,7 +47,7 @@ TEST(FindAndModifyRequest, BasicUpdate) {
update: { y: 1 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithUpsert) {
@@ -63,7 +63,7 @@ TEST(FindAndModifyRequest, UpdateWithUpsert) {
upsert: true
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithUpsertFalse) {
@@ -79,7 +79,7 @@ TEST(FindAndModifyRequest, UpdateWithUpsertFalse) {
upsert: false
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithProjection) {
@@ -97,7 +97,7 @@ TEST(FindAndModifyRequest, UpdateWithProjection) {
fields: { z: 1 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithNewTrue) {
@@ -114,7 +114,7 @@ TEST(FindAndModifyRequest, UpdateWithNewTrue) {
new: true
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithNewFalse) {
@@ -131,7 +131,7 @@ TEST(FindAndModifyRequest, UpdateWithNewFalse) {
new: false
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithSort) {
@@ -149,7 +149,7 @@ TEST(FindAndModifyRequest, UpdateWithSort) {
sort: { z: -1 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithCollation) {
@@ -168,7 +168,7 @@ TEST(FindAndModifyRequest, UpdateWithCollation) {
collation: { locale: 'en_US' }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithArrayFilters) {
@@ -186,7 +186,7 @@ TEST(FindAndModifyRequest, UpdateWithArrayFilters) {
arrayFilters: [ { i: 0 } ]
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithWriteConcern) {
@@ -204,7 +204,7 @@ TEST(FindAndModifyRequest, UpdateWithWriteConcern) {
writeConcern: { w: 2, fsync: true, wtimeout: 150 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, UpdateWithFullSpec) {
@@ -239,7 +239,7 @@ TEST(FindAndModifyRequest, UpdateWithFullSpec) {
writeConcern: { w: 2, fsync: true, wtimeout: 150 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, BasicRemove) {
@@ -252,7 +252,7 @@ TEST(FindAndModifyRequest, BasicRemove) {
remove: true
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, RemoveWithProjection) {
@@ -269,7 +269,7 @@ TEST(FindAndModifyRequest, RemoveWithProjection) {
fields: { z: 1 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, RemoveWithSort) {
@@ -286,7 +286,7 @@ TEST(FindAndModifyRequest, RemoveWithSort) {
sort: { z: -1 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, RemoveWithCollation) {
@@ -304,7 +304,7 @@ TEST(FindAndModifyRequest, RemoveWithCollation) {
collation: { locale: 'en_US' }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, RemoveWithWriteConcern) {
@@ -321,7 +321,7 @@ TEST(FindAndModifyRequest, RemoveWithWriteConcern) {
writeConcern: { w: 2, fsync: true, wtimeout: 150 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, RemoveWithFullSpec) {
@@ -348,7 +348,7 @@ TEST(FindAndModifyRequest, RemoveWithFullSpec) {
writeConcern: { w: 2, fsync: true, wtimeout: 150 }
})json"));
- ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({}));
+ ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON());
}
TEST(FindAndModifyRequest, ParseWithUpdateOnlyRequiredFields) {
diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp
new file mode 100644
index 00000000000..aa860bd5bb4
--- /dev/null
+++ b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp
@@ -0,0 +1,537 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/catalog/dist_lock_catalog_impl.h"
+
+#include <string>
+
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/db/lasterror.h"
+#include "mongo/db/query/find_and_modify_request.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/sharding_metadata.h"
+#include "mongo/s/catalog/type_lockpings.h"
+#include "mongo/s/catalog/type_locks.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+using std::string;
+using std::vector;
+
+namespace {
+
+const char kFindAndModifyResponseResultDocField[] = "value";
+const char kLocalTimeField[] = "localTime";
+
+const ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly, TagSet());
+
+/**
+ * Returns the resulting new object from the findAndModify response object.
+ * Returns LockStateChangeFailed if value field was null, which indicates that
+ * the findAndModify command did not modify any document.
+ * This also checks for errors in the response object.
+ */
+StatusWith<BSONObj> extractFindAndModifyNewObj(StatusWith<Shard::CommandResponse> response) {
+ if (!response.isOK()) {
+ return response.getStatus();
+ }
+ if (!response.getValue().commandStatus.isOK()) {
+ return response.getValue().commandStatus;
+ }
+ if (!response.getValue().writeConcernStatus.isOK()) {
+ return response.getValue().writeConcernStatus;
+ }
+
+ auto responseObj = std::move(response.getValue().response);
+
+ if (const auto& newDocElem = responseObj[kFindAndModifyResponseResultDocField]) {
+ if (newDocElem.isNull()) {
+ return {ErrorCodes::LockStateChangeFailed,
+ "findAndModify query predicate didn't match any lock document"};
+ }
+
+ if (!newDocElem.isABSONObj()) {
+ return {ErrorCodes::UnsupportedFormat,
+ str::stream() << "expected an object from the findAndModify response '"
+ << kFindAndModifyResponseResultDocField
+ << "'field, got: "
+ << newDocElem};
+ }
+
+ return newDocElem.Obj().getOwned();
+ }
+
+ return {ErrorCodes::UnsupportedFormat,
+ str::stream() << "no '" << kFindAndModifyResponseResultDocField
+ << "' in findAndModify response"};
+}
+
+/**
+ * Extract the electionId from a serverStatus command response.
+ */
+StatusWith<OID> extractElectionId(const BSONObj& responseObj) {
+ BSONElement replElem;
+ auto replElemStatus = bsonExtractTypedField(responseObj, "repl", Object, &replElem);
+
+ if (!replElemStatus.isOK()) {
+ return {ErrorCodes::UnsupportedFormat, replElemStatus.reason()};
+ }
+
+ const auto replSubObj = replElem.Obj();
+ OID electionId;
+ auto electionIdStatus = bsonExtractOIDField(replSubObj, "electionId", &electionId);
+
+ if (!electionIdStatus.isOK()) {
+ // Secondaries don't have electionId.
+ if (electionIdStatus.code() == ErrorCodes::NoSuchKey) {
+ // Verify that the from replSubObj that this is indeed not a primary.
+ bool isPrimary = false;
+ auto isPrimaryStatus = bsonExtractBooleanField(replSubObj, "ismaster", &isPrimary);
+
+ if (!isPrimaryStatus.isOK()) {
+ return {ErrorCodes::UnsupportedFormat, isPrimaryStatus.reason()};
+ }
+
+ if (isPrimary) {
+ string hostContacted;
+ auto hostContactedStatus = bsonExtractStringField(replSubObj, "me", &hostContacted);
+
+ if (!hostContactedStatus.isOK()) {
+ return {
+ ErrorCodes::UnsupportedFormat,
+ str::stream()
+ << "failed to extract 'me' field from repl subsection of serverStatus: "
+ << hostContactedStatus.reason()};
+ }
+
+ return {ErrorCodes::UnsupportedFormat,
+ str::stream() << "expected primary to have electionId but not present on "
+ << hostContacted};
+ }
+
+ return {ErrorCodes::NotMaster, "only primary can have electionId"};
+ }
+
+ return {ErrorCodes::UnsupportedFormat, electionIdStatus.reason()};
+ }
+
+ return electionId;
+}
+
+} // unnamed namespace
+
+DistLockCatalogImpl::DistLockCatalogImpl()
+ : _lockPingNS(LockpingsType::ConfigNS), _locksNS(LocksType::ConfigNS) {}
+
+DistLockCatalogImpl::~DistLockCatalogImpl() = default;
+
+StatusWith<LockpingsType> DistLockCatalogImpl::getPing(OperationContext* opCtx,
+ StringData processID) {
+ auto findResult = _findOnConfig(
+ opCtx, kReadPref, _lockPingNS, BSON(LockpingsType::process() << processID), BSONObj(), 1);
+
+ if (!findResult.isOK()) {
+ return findResult.getStatus();
+ }
+
+ const auto& findResultSet = findResult.getValue();
+
+ if (findResultSet.empty()) {
+ return {ErrorCodes::NoMatchingDocument,
+ str::stream() << "ping entry for " << processID << " not found"};
+ }
+
+ BSONObj doc = findResultSet.front();
+ auto pingDocResult = LockpingsType::fromBSON(doc);
+ if (!pingDocResult.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "failed to parse document: " << doc << " : "
+ << pingDocResult.getStatus().toString()};
+ }
+
+ return pingDocResult.getValue();
+}
+
+Status DistLockCatalogImpl::ping(OperationContext* opCtx, StringData processID, Date_t ping) {
+ auto request =
+ FindAndModifyRequest::makeUpdate(_lockPingNS,
+ BSON(LockpingsType::process() << processID),
+ BSON("$set" << BSON(LockpingsType::ping(ping))));
+ request.setUpsert(true);
+ request.setWriteConcern(kMajorityWriteConcern);
+
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ _locksNS.db().toString(),
+ request.toBSON(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kNotIdempotent);
+
+ auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus));
+ return findAndModifyStatus.getStatus();
+}
+
+StatusWith<LocksType> DistLockCatalogImpl::grabLock(OperationContext* opCtx,
+ StringData lockID,
+ const OID& lockSessionID,
+ StringData who,
+ StringData processId,
+ Date_t time,
+ StringData why,
+ const WriteConcernOptions& writeConcern) {
+ BSONObj newLockDetails(BSON(
+ LocksType::lockID(lockSessionID) << LocksType::state(LocksType::LOCKED) << LocksType::who()
+ << who
+ << LocksType::process()
+ << processId
+ << LocksType::when(time)
+ << LocksType::why()
+ << why));
+
+ auto request = FindAndModifyRequest::makeUpdate(
+ _locksNS,
+ BSON(LocksType::name() << lockID << LocksType::state(LocksType::UNLOCKED)),
+ BSON("$set" << newLockDetails));
+ request.setUpsert(true);
+ request.setShouldReturnNew(true);
+ request.setWriteConcern(writeConcern);
+
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ _locksNS.db().toString(),
+ request.toBSON(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kNoRetry); // Dist lock manager is handling own retries
+
+ auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus));
+ if (!findAndModifyStatus.isOK()) {
+ if (findAndModifyStatus == ErrorCodes::DuplicateKey) {
+ // Another thread won the upsert race. Also see SERVER-14322.
+ return {ErrorCodes::LockStateChangeFailed,
+ str::stream() << "duplicateKey error during upsert of lock: " << lockID};
+ }
+
+ return findAndModifyStatus.getStatus();
+ }
+
+ BSONObj doc = findAndModifyStatus.getValue();
+ auto locksTypeResult = LocksType::fromBSON(doc);
+ if (!locksTypeResult.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "failed to parse: " << doc << " : "
+ << locksTypeResult.getStatus().toString()};
+ }
+
+ return locksTypeResult.getValue();
+}
+
+StatusWith<LocksType> DistLockCatalogImpl::overtakeLock(OperationContext* opCtx,
+ StringData lockID,
+ const OID& lockSessionID,
+ const OID& currentHolderTS,
+ StringData who,
+ StringData processId,
+ Date_t time,
+ StringData why) {
+ BSONArrayBuilder orQueryBuilder;
+ orQueryBuilder.append(
+ BSON(LocksType::name() << lockID << LocksType::state(LocksType::UNLOCKED)));
+ orQueryBuilder.append(BSON(LocksType::name() << lockID << LocksType::lockID(currentHolderTS)));
+
+ BSONObj newLockDetails(BSON(
+ LocksType::lockID(lockSessionID) << LocksType::state(LocksType::LOCKED) << LocksType::who()
+ << who
+ << LocksType::process()
+ << processId
+ << LocksType::when(time)
+ << LocksType::why()
+ << why));
+
+ auto request = FindAndModifyRequest::makeUpdate(
+ _locksNS, BSON("$or" << orQueryBuilder.arr()), BSON("$set" << newLockDetails));
+ request.setShouldReturnNew(true);
+ request.setWriteConcern(kMajorityWriteConcern);
+
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ _locksNS.db().toString(),
+ request.toBSON(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kNotIdempotent);
+
+ auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus));
+ if (!findAndModifyStatus.isOK()) {
+ return findAndModifyStatus.getStatus();
+ }
+
+ BSONObj doc = findAndModifyStatus.getValue();
+ auto locksTypeResult = LocksType::fromBSON(doc);
+ if (!locksTypeResult.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "failed to parse: " << doc << " : "
+ << locksTypeResult.getStatus().toString()};
+ }
+
+ return locksTypeResult.getValue();
+}
+
+Status DistLockCatalogImpl::unlock(OperationContext* opCtx, const OID& lockSessionID) {
+ FindAndModifyRequest request = FindAndModifyRequest::makeUpdate(
+ _locksNS,
+ BSON(LocksType::lockID(lockSessionID)),
+ BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
+ request.setWriteConcern(kMajorityWriteConcern);
+ return _unlock(opCtx, request);
+}
+
+Status DistLockCatalogImpl::unlock(OperationContext* opCtx,
+ const OID& lockSessionID,
+ StringData name) {
+ FindAndModifyRequest request = FindAndModifyRequest::makeUpdate(
+ _locksNS,
+ BSON(LocksType::lockID(lockSessionID) << LocksType::name(name.toString())),
+ BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
+ request.setWriteConcern(kMajorityWriteConcern);
+ return _unlock(opCtx, request);
+}
+
+Status DistLockCatalogImpl::_unlock(OperationContext* opCtx, const FindAndModifyRequest& request) {
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ _locksNS.db().toString(),
+ request.toBSON(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
+
+ auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus));
+ if (findAndModifyStatus == ErrorCodes::LockStateChangeFailed) {
+ // Did not modify any document, which implies that the lock already has a
+ // a different owner. This is ok since it means that the objective of
+ // releasing ownership of the lock has already been accomplished.
+ return Status::OK();
+ }
+
+ return findAndModifyStatus.getStatus();
+}
+
+Status DistLockCatalogImpl::unlockAll(OperationContext* opCtx, const std::string& processID) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(_locksNS);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(BSON(LocksType::process(processID)));
+ entry.setU(BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
+ entry.setUpsert(false);
+ entry.setMulti(true);
+ return entry;
+ }()});
+ return updateOp;
+ }());
+ request.setWriteConcern(kLocalWriteConcern.toBSON());
+
+ BSONObj cmdObj = request.toBSON();
+
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto response = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ _locksNS.db().toString(),
+ cmdObj,
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
+
+ if (!response.isOK()) {
+ return response.getStatus();
+ }
+ if (!response.getValue().commandStatus.isOK()) {
+ return response.getValue().commandStatus;
+ }
+ if (!response.getValue().writeConcernStatus.isOK()) {
+ return response.getValue().writeConcernStatus;
+ }
+
+ BatchedCommandResponse batchResponse;
+ std::string errmsg;
+ if (!batchResponse.parseBSON(response.getValue().response, &errmsg)) {
+ return Status(ErrorCodes::FailedToParse,
+ str::stream()
+ << "Failed to parse config server response to batch request for "
+ "unlocking existing distributed locks"
+ << causedBy(errmsg));
+ }
+ return batchResponse.toStatus();
+}
+
+StatusWith<DistLockCatalog::ServerInfo> DistLockCatalogImpl::getServerInfo(
+ OperationContext* opCtx) {
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ kReadPref,
+ "admin",
+ BSON("serverStatus" << 1),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
+
+ if (!resultStatus.isOK()) {
+ return resultStatus.getStatus();
+ }
+ if (!resultStatus.getValue().commandStatus.isOK()) {
+ return resultStatus.getValue().commandStatus;
+ }
+
+ BSONObj responseObj(std::move(resultStatus.getValue().response));
+
+ BSONElement localTimeElem;
+ auto localTimeStatus =
+ bsonExtractTypedField(responseObj, kLocalTimeField, Date, &localTimeElem);
+
+ if (!localTimeStatus.isOK()) {
+ return {ErrorCodes::UnsupportedFormat, localTimeStatus.reason()};
+ }
+
+ auto electionIdStatus = extractElectionId(responseObj);
+
+ if (!electionIdStatus.isOK()) {
+ return electionIdStatus.getStatus();
+ }
+
+ return DistLockCatalog::ServerInfo(localTimeElem.date(), electionIdStatus.getValue());
+}
+
+StatusWith<LocksType> DistLockCatalogImpl::getLockByTS(OperationContext* opCtx,
+ const OID& lockSessionID) {
+ auto findResult = _findOnConfig(
+ opCtx, kReadPref, _locksNS, BSON(LocksType::lockID(lockSessionID)), BSONObj(), 1);
+
+ if (!findResult.isOK()) {
+ return findResult.getStatus();
+ }
+
+ const auto& findResultSet = findResult.getValue();
+
+ if (findResultSet.empty()) {
+ return {ErrorCodes::LockNotFound,
+ str::stream() << "lock with ts " << lockSessionID << " not found"};
+ }
+
+ BSONObj doc = findResultSet.front();
+ auto locksTypeResult = LocksType::fromBSON(doc);
+ if (!locksTypeResult.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "failed to parse: " << doc << " : "
+ << locksTypeResult.getStatus().toString()};
+ }
+
+ return locksTypeResult.getValue();
+}
+
+StatusWith<LocksType> DistLockCatalogImpl::getLockByName(OperationContext* opCtx, StringData name) {
+ auto findResult =
+ _findOnConfig(opCtx, kReadPref, _locksNS, BSON(LocksType::name() << name), BSONObj(), 1);
+
+ if (!findResult.isOK()) {
+ return findResult.getStatus();
+ }
+
+ const auto& findResultSet = findResult.getValue();
+
+ if (findResultSet.empty()) {
+ return {ErrorCodes::LockNotFound,
+ str::stream() << "lock with name " << name << " not found"};
+ }
+
+ BSONObj doc = findResultSet.front();
+ auto locksTypeResult = LocksType::fromBSON(doc);
+ if (!locksTypeResult.isOK()) {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "failed to parse: " << doc << " : "
+ << locksTypeResult.getStatus().toString()};
+ }
+
+ return locksTypeResult.getValue();
+}
+
+Status DistLockCatalogImpl::stopPing(OperationContext* opCtx, StringData processId) {
+ auto request =
+ FindAndModifyRequest::makeRemove(_lockPingNS, BSON(LockpingsType::process() << processId));
+ request.setWriteConcern(kMajorityWriteConcern);
+
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ _locksNS.db().toString(),
+ request.toBSON(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kNotIdempotent);
+
+ auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus));
+ return findAndModifyStatus.getStatus();
+}
+
+StatusWith<vector<BSONObj>> DistLockCatalogImpl::_findOnConfig(
+ OperationContext* opCtx,
+ const ReadPreferenceSetting& readPref,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit) {
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto result = shardRegistry->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx, readPref, repl::ReadConcernLevel::kMajorityReadConcern, nss, query, sort, limit);
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+
+ return result.getValue().docs;
+}
+
+} // namespace mongo