diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-25 22:52:43 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-25 22:52:43 -0400 |
commit | fe5ef2303f53773cabd01e0ce7e7cc83965fe211 (patch) | |
tree | 1ee9f47542ca6e665008e8911e171e3c83f526f6 | |
parent | 9318790212dead660f8f057f18ed48fba9232b83 (diff) | |
download | mongo-fe5ef2303f53773cabd01e0ce7e7cc83965fe211.tar.gz |
Revert "SERVER-40005 Add translation for FindAndModify for FLE"
This reverts commit 9318790212dead660f8f057f18ed48fba9232b83.
-rw-r--r-- | jstests/noPassthrough/commands_handle_kill.js | 2 | ||||
-rw-r--r-- | jstests/noPassthrough/readConcern_snapshot_mongos.js | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/query/find_and_modify_request.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/query/find_and_modify_request.h | 9 | ||||
-rw-r--r-- | src/mongo/db/query/find_and_modify_request_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/s/catalog/dist_lock_catalog_impl.cpp | 537 |
7 files changed, 604 insertions, 101 deletions
diff --git a/jstests/noPassthrough/commands_handle_kill.js b/jstests/noPassthrough/commands_handle_kill.js index 3838c90425c..799cfc8cca1 100644 --- a/jstests/noPassthrough/commands_handle_kill.js +++ b/jstests/noPassthrough/commands_handle_kill.js @@ -188,7 +188,7 @@ if (${ canYield }) { {distinct: collName, key: "_id", query: {a: {$gte: 0}}}, {usesIndex: true}); assertCommandPropogatesPlanExecutorKillReason( - {findAndModify: collName, query: {fakeField: {$gt: 0}}, update: {$inc: {a: 1}}}); + {findAndModify: collName, filter: {fakeField: {$gt: 0}}, update: {$inc: {a: 1}}}); assertCommandPropogatesPlanExecutorKillReason( { diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js index 7427a5669f5..a30f3ac9aae 100644 --- a/jstests/noPassthrough/readConcern_snapshot_mongos.js +++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js @@ -88,7 +88,7 @@ // readConcern 'snapshot' is supported by findAndModify on mongos in a transaction. expectSuccessInTxnThenAbort(session, sessionDb, { findAndModify: collName, - query: {}, + filter: {}, update: {$set: {a: 1}}, readConcern: {level: "snapshot"}, }); diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 32a160d433c..6e8efaa83b9 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -54,7 +54,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, if (opType == repl::OpTypeEnum::kDelete) { uassert( 40606, - str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) << " is not compatible with previous write in the transaction of type: " << OpType_serializer(oplogEntry.getOpType()) << ", oplogTs: " @@ -64,12 +64,12 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, request.isRemove()); uassert(40607, str::stream() << "No pre-image available for findAndModify retry request:" - << redact(request.toBSON({})), + << redact(request.toBSON()), oplogWithCorrectLinks.getPreImageOpTime()); } else if (opType == repl::OpTypeEnum::kInsert) { uassert( 40608, - str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) << " is not compatible with previous write in the transaction of type: " << OpType_serializer(oplogEntry.getOpType()) << ", oplogTs: " @@ -80,7 +80,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, } else { uassert( 40609, - str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) << " is not compatible with previous write in the transaction of type: " << OpType_serializer(oplogEntry.getOpType()) << ", oplogTs: " @@ -91,7 +91,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, if (request.shouldReturnNew()) { uassert(40611, - str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) << " wants the document after update returned, but only before " "update document is stored, oplogTs: " << ts.toString() @@ -100,7 +100,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, oplogWithCorrectLinks.getPostImageOpTime()); } else { uassert(40612, - str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) + str::stream() << "findAndModify retry request: " << redact(request.toBSON()) << " wants the document before update returned, but only after " "update document is stored, oplogTs: " << ts.toString() diff --git a/src/mongo/db/query/find_and_modify_request.cpp b/src/mongo/db/query/find_and_modify_request.cpp index 6c46b639177..1ef84b03762 100644 --- a/src/mongo/db/query/find_and_modify_request.cpp +++ b/src/mongo/db/query/find_and_modify_request.cpp @@ -34,9 +34,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" -#include "mongo/db/command_generic_argument.h" #include "mongo/db/write_concern.h" -#include "mongo/idl/idl_parser.h" namespace mongo { @@ -54,12 +52,6 @@ const char kUpsertField[] = "upsert"; const char kWriteConcernField[] = "writeConcern"; const std::vector<BSONObj> emptyArrayFilters{}; - -const std::vector<StringData> _knownFields{ - FindAndModifyRequest::kBypassDocumentValidationFieldName, - FindAndModifyRequest::kLegacyCommandName, - FindAndModifyRequest::kCommandName, -}; } // unnamed namespace FindAndModifyRequest::FindAndModifyRequest(NamespaceString fullNs, BSONObj query, BSONObj updateObj) @@ -75,12 +67,12 @@ FindAndModifyRequest FindAndModifyRequest::makeUpdate(NamespaceString fullNs, } FindAndModifyRequest FindAndModifyRequest::makeRemove(NamespaceString fullNs, BSONObj query) { - FindAndModifyRequest request(fullNs, query, {}); + FindAndModifyRequest request(fullNs, query, BSONObj()); request._isRemove = true; return request; } -BSONObj FindAndModifyRequest::toBSON(const BSONObj& commandPassthroughFields) const { +BSONObj FindAndModifyRequest::toBSON() const { BSONObjBuilder builder; builder.append(kCmdName, _ns.coll()); @@ -124,77 +116,56 @@ BSONObj FindAndModifyRequest::toBSON(const BSONObj& commandPassthroughFields) co builder.append(kWriteConcernField, _writeConcern->toBSON()); } - IDLParserErrorContext::appendGenericCommandArguments( - commandPassthroughFields, _knownFields, &builder); - return builder.obj(); } StatusWith<FindAndModifyRequest> FindAndModifyRequest::parseFromBSON(NamespaceString fullNs, const BSONObj& cmdObj) { - BSONObj query; - BSONObj fields; - BSONObj updateObj; - BSONObj sort; + BSONObj query = cmdObj.getObjectField(kQueryField); + BSONObj fields = cmdObj.getObjectField(kFieldProjectionField); + BSONObj updateObj = cmdObj.getObjectField(kUpdateField); + BSONObj sort = cmdObj.getObjectField(kSortField); + BSONObj collation; - bool shouldReturnNew = false; - bool isUpsert = false; - bool isRemove = false; - bool isUpdate = false; - bool arrayFiltersSet = false; - std::vector<BSONObj> arrayFilters; + { + BSONElement collationElt; + Status collationEltStatus = + bsonExtractTypedField(cmdObj, kCollationField, BSONType::Object, &collationElt); + if (!collationEltStatus.isOK() && (collationEltStatus != ErrorCodes::NoSuchKey)) { + return collationEltStatus; + } + if (collationEltStatus.isOK()) { + collation = collationElt.Obj(); + } + } - for (auto&& field : cmdObj.getFieldNames<std::set<std::string>>()) { - if (field == kQueryField) { - query = cmdObj.getObjectField(kQueryField); - } else if (field == kSortField) { - sort = cmdObj.getObjectField(kSortField); - } else if (field == kRemoveField) { - isRemove = cmdObj[kRemoveField].trueValue(); - } else if (field == kUpdateField) { - updateObj = cmdObj.getObjectField(kUpdateField); - isUpdate = true; - } else if (field == kNewField) { - shouldReturnNew = cmdObj[kNewField].trueValue(); - } else if (field == kFieldProjectionField) { - fields = cmdObj.getObjectField(kFieldProjectionField); - } else if (field == kUpsertField) { - isUpsert = cmdObj[kUpsertField].trueValue(); - } else if (field == kCollationField) { - BSONElement collationElt; - Status collationEltStatus = - bsonExtractTypedField(cmdObj, kCollationField, BSONType::Object, &collationElt); - if (!collationEltStatus.isOK() && (collationEltStatus != ErrorCodes::NoSuchKey)) { - return collationEltStatus; - } - if (collationEltStatus.isOK()) { - collation = collationElt.Obj(); - } - } else if (field == kArrayFiltersField) { - BSONElement arrayFiltersElt; - Status arrayFiltersEltStatus = bsonExtractTypedField( - cmdObj, kArrayFiltersField, BSONType::Array, &arrayFiltersElt); - if (!arrayFiltersEltStatus.isOK() && (arrayFiltersEltStatus != ErrorCodes::NoSuchKey)) { - return arrayFiltersEltStatus; - } - if (arrayFiltersEltStatus.isOK()) { - arrayFiltersSet = true; - for (auto arrayFilter : arrayFiltersElt.Obj()) { - if (arrayFilter.type() != BSONType::Object) { - return {ErrorCodes::TypeMismatch, - str::stream() << "Each array filter must be an object, found " - << arrayFilter.type()}; - } - arrayFilters.push_back(arrayFilter.Obj()); + std::vector<BSONObj> arrayFilters; + bool arrayFiltersSet = false; + { + BSONElement arrayFiltersElt; + Status arrayFiltersEltStatus = + bsonExtractTypedField(cmdObj, kArrayFiltersField, BSONType::Array, &arrayFiltersElt); + if (!arrayFiltersEltStatus.isOK() && (arrayFiltersEltStatus != ErrorCodes::NoSuchKey)) { + return arrayFiltersEltStatus; + } + if (arrayFiltersEltStatus.isOK()) { + arrayFiltersSet = true; + for (auto arrayFilter : arrayFiltersElt.Obj()) { + if (arrayFilter.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + str::stream() << "Each array filter must be an object, found " + << arrayFilter.type()}; } + arrayFilters.push_back(arrayFilter.Obj()); } - } else if (!isGenericArgument(field) && - !std::count(_knownFields.begin(), _knownFields.end(), field)) { - return {ErrorCodes::Error(51177), - str::stream() << "BSON field '" << field << "' is an unknown field."}; } } + bool shouldReturnNew = cmdObj[kNewField].trueValue(); + bool isUpsert = cmdObj[kUpsertField].trueValue(); + bool isRemove = cmdObj[kRemoveField].trueValue(); + bool isUpdate = cmdObj.hasField(kUpdateField); + if (!isRemove && !isUpdate) { return {ErrorCodes::FailedToParse, "Either an update or remove=true must be specified"}; } diff --git a/src/mongo/db/query/find_and_modify_request.h b/src/mongo/db/query/find_and_modify_request.h index f0882a63941..71e1ac983d1 100644 --- a/src/mongo/db/query/find_and_modify_request.h +++ b/src/mongo/db/query/find_and_modify_request.h @@ -50,10 +50,6 @@ class StatusWith; */ class FindAndModifyRequest { public: - static constexpr auto kBypassDocumentValidationFieldName = "bypassDocumentValidation"_sd; - static constexpr auto kLegacyCommandName = "findandmodify"_sd; - static constexpr auto kCommandName = "findAndModify"_sd; - /** * Creates a new instance of an 'update' type findAndModify request. */ @@ -91,10 +87,9 @@ public: /** * Serializes this object into a BSON representation. Fields that are not - * set will not be part of the the serialized object. Passthrough fields - * are appended. + * set will not be part of the the serialized object. */ - BSONObj toBSON(const BSONObj& commandPassthroughFields) const; + BSONObj toBSON() const; const NamespaceString& getNamespaceString() const; BSONObj getQuery() const; diff --git a/src/mongo/db/query/find_and_modify_request_test.cpp b/src/mongo/db/query/find_and_modify_request_test.cpp index a30bdd2e538..7266cb04a18 100644 --- a/src/mongo/db/query/find_and_modify_request_test.cpp +++ b/src/mongo/db/query/find_and_modify_request_test.cpp @@ -47,7 +47,7 @@ TEST(FindAndModifyRequest, BasicUpdate) { update: { y: 1 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithUpsert) { @@ -63,7 +63,7 @@ TEST(FindAndModifyRequest, UpdateWithUpsert) { upsert: true })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithUpsertFalse) { @@ -79,7 +79,7 @@ TEST(FindAndModifyRequest, UpdateWithUpsertFalse) { upsert: false })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithProjection) { @@ -97,7 +97,7 @@ TEST(FindAndModifyRequest, UpdateWithProjection) { fields: { z: 1 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithNewTrue) { @@ -114,7 +114,7 @@ TEST(FindAndModifyRequest, UpdateWithNewTrue) { new: true })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithNewFalse) { @@ -131,7 +131,7 @@ TEST(FindAndModifyRequest, UpdateWithNewFalse) { new: false })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithSort) { @@ -149,7 +149,7 @@ TEST(FindAndModifyRequest, UpdateWithSort) { sort: { z: -1 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithCollation) { @@ -168,7 +168,7 @@ TEST(FindAndModifyRequest, UpdateWithCollation) { collation: { locale: 'en_US' } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithArrayFilters) { @@ -186,7 +186,7 @@ TEST(FindAndModifyRequest, UpdateWithArrayFilters) { arrayFilters: [ { i: 0 } ] })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithWriteConcern) { @@ -204,7 +204,7 @@ TEST(FindAndModifyRequest, UpdateWithWriteConcern) { writeConcern: { w: 2, fsync: true, wtimeout: 150 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, UpdateWithFullSpec) { @@ -239,7 +239,7 @@ TEST(FindAndModifyRequest, UpdateWithFullSpec) { writeConcern: { w: 2, fsync: true, wtimeout: 150 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, BasicRemove) { @@ -252,7 +252,7 @@ TEST(FindAndModifyRequest, BasicRemove) { remove: true })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, RemoveWithProjection) { @@ -269,7 +269,7 @@ TEST(FindAndModifyRequest, RemoveWithProjection) { fields: { z: 1 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, RemoveWithSort) { @@ -286,7 +286,7 @@ TEST(FindAndModifyRequest, RemoveWithSort) { sort: { z: -1 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, RemoveWithCollation) { @@ -304,7 +304,7 @@ TEST(FindAndModifyRequest, RemoveWithCollation) { collation: { locale: 'en_US' } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, RemoveWithWriteConcern) { @@ -321,7 +321,7 @@ TEST(FindAndModifyRequest, RemoveWithWriteConcern) { writeConcern: { w: 2, fsync: true, wtimeout: 150 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, RemoveWithFullSpec) { @@ -348,7 +348,7 @@ TEST(FindAndModifyRequest, RemoveWithFullSpec) { writeConcern: { w: 2, fsync: true, wtimeout: 150 } })json")); - ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON({})); + ASSERT_BSONOBJ_EQ(expectedObj, request.toBSON()); } TEST(FindAndModifyRequest, ParseWithUpdateOnlyRequiredFields) { diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp new file mode 100644 index 00000000000..aa860bd5bb4 --- /dev/null +++ b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp @@ -0,0 +1,537 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/dist_lock_catalog_impl.h" + +#include <string> + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/read_preference.h" +#include "mongo/db/lasterror.h" +#include "mongo/db/query/find_and_modify_request.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/sharding_metadata.h" +#include "mongo/s/catalog/type_lockpings.h" +#include "mongo/s/catalog/type_locks.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +using std::string; +using std::vector; + +namespace { + +const char kFindAndModifyResponseResultDocField[] = "value"; +const char kLocalTimeField[] = "localTime"; + +const ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly, TagSet()); + +/** + * Returns the resulting new object from the findAndModify response object. + * Returns LockStateChangeFailed if value field was null, which indicates that + * the findAndModify command did not modify any document. + * This also checks for errors in the response object. + */ +StatusWith<BSONObj> extractFindAndModifyNewObj(StatusWith<Shard::CommandResponse> response) { + if (!response.isOK()) { + return response.getStatus(); + } + if (!response.getValue().commandStatus.isOK()) { + return response.getValue().commandStatus; + } + if (!response.getValue().writeConcernStatus.isOK()) { + return response.getValue().writeConcernStatus; + } + + auto responseObj = std::move(response.getValue().response); + + if (const auto& newDocElem = responseObj[kFindAndModifyResponseResultDocField]) { + if (newDocElem.isNull()) { + return {ErrorCodes::LockStateChangeFailed, + "findAndModify query predicate didn't match any lock document"}; + } + + if (!newDocElem.isABSONObj()) { + return {ErrorCodes::UnsupportedFormat, + str::stream() << "expected an object from the findAndModify response '" + << kFindAndModifyResponseResultDocField + << "'field, got: " + << newDocElem}; + } + + return newDocElem.Obj().getOwned(); + } + + return {ErrorCodes::UnsupportedFormat, + str::stream() << "no '" << kFindAndModifyResponseResultDocField + << "' in findAndModify response"}; +} + +/** + * Extract the electionId from a serverStatus command response. + */ +StatusWith<OID> extractElectionId(const BSONObj& responseObj) { + BSONElement replElem; + auto replElemStatus = bsonExtractTypedField(responseObj, "repl", Object, &replElem); + + if (!replElemStatus.isOK()) { + return {ErrorCodes::UnsupportedFormat, replElemStatus.reason()}; + } + + const auto replSubObj = replElem.Obj(); + OID electionId; + auto electionIdStatus = bsonExtractOIDField(replSubObj, "electionId", &electionId); + + if (!electionIdStatus.isOK()) { + // Secondaries don't have electionId. + if (electionIdStatus.code() == ErrorCodes::NoSuchKey) { + // Verify that the from replSubObj that this is indeed not a primary. + bool isPrimary = false; + auto isPrimaryStatus = bsonExtractBooleanField(replSubObj, "ismaster", &isPrimary); + + if (!isPrimaryStatus.isOK()) { + return {ErrorCodes::UnsupportedFormat, isPrimaryStatus.reason()}; + } + + if (isPrimary) { + string hostContacted; + auto hostContactedStatus = bsonExtractStringField(replSubObj, "me", &hostContacted); + + if (!hostContactedStatus.isOK()) { + return { + ErrorCodes::UnsupportedFormat, + str::stream() + << "failed to extract 'me' field from repl subsection of serverStatus: " + << hostContactedStatus.reason()}; + } + + return {ErrorCodes::UnsupportedFormat, + str::stream() << "expected primary to have electionId but not present on " + << hostContacted}; + } + + return {ErrorCodes::NotMaster, "only primary can have electionId"}; + } + + return {ErrorCodes::UnsupportedFormat, electionIdStatus.reason()}; + } + + return electionId; +} + +} // unnamed namespace + +DistLockCatalogImpl::DistLockCatalogImpl() + : _lockPingNS(LockpingsType::ConfigNS), _locksNS(LocksType::ConfigNS) {} + +DistLockCatalogImpl::~DistLockCatalogImpl() = default; + +StatusWith<LockpingsType> DistLockCatalogImpl::getPing(OperationContext* opCtx, + StringData processID) { + auto findResult = _findOnConfig( + opCtx, kReadPref, _lockPingNS, BSON(LockpingsType::process() << processID), BSONObj(), 1); + + if (!findResult.isOK()) { + return findResult.getStatus(); + } + + const auto& findResultSet = findResult.getValue(); + + if (findResultSet.empty()) { + return {ErrorCodes::NoMatchingDocument, + str::stream() << "ping entry for " << processID << " not found"}; + } + + BSONObj doc = findResultSet.front(); + auto pingDocResult = LockpingsType::fromBSON(doc); + if (!pingDocResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "failed to parse document: " << doc << " : " + << pingDocResult.getStatus().toString()}; + } + + return pingDocResult.getValue(); +} + +Status DistLockCatalogImpl::ping(OperationContext* opCtx, StringData processID, Date_t ping) { + auto request = + FindAndModifyRequest::makeUpdate(_lockPingNS, + BSON(LockpingsType::process() << processID), + BSON("$set" << BSON(LockpingsType::ping(ping)))); + request.setUpsert(true); + request.setWriteConcern(kMajorityWriteConcern); + + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + _locksNS.db().toString(), + request.toBSON(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kNotIdempotent); + + auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus)); + return findAndModifyStatus.getStatus(); +} + +StatusWith<LocksType> DistLockCatalogImpl::grabLock(OperationContext* opCtx, + StringData lockID, + const OID& lockSessionID, + StringData who, + StringData processId, + Date_t time, + StringData why, + const WriteConcernOptions& writeConcern) { + BSONObj newLockDetails(BSON( + LocksType::lockID(lockSessionID) << LocksType::state(LocksType::LOCKED) << LocksType::who() + << who + << LocksType::process() + << processId + << LocksType::when(time) + << LocksType::why() + << why)); + + auto request = FindAndModifyRequest::makeUpdate( + _locksNS, + BSON(LocksType::name() << lockID << LocksType::state(LocksType::UNLOCKED)), + BSON("$set" << newLockDetails)); + request.setUpsert(true); + request.setShouldReturnNew(true); + request.setWriteConcern(writeConcern); + + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + _locksNS.db().toString(), + request.toBSON(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kNoRetry); // Dist lock manager is handling own retries + + auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus)); + if (!findAndModifyStatus.isOK()) { + if (findAndModifyStatus == ErrorCodes::DuplicateKey) { + // Another thread won the upsert race. Also see SERVER-14322. + return {ErrorCodes::LockStateChangeFailed, + str::stream() << "duplicateKey error during upsert of lock: " << lockID}; + } + + return findAndModifyStatus.getStatus(); + } + + BSONObj doc = findAndModifyStatus.getValue(); + auto locksTypeResult = LocksType::fromBSON(doc); + if (!locksTypeResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "failed to parse: " << doc << " : " + << locksTypeResult.getStatus().toString()}; + } + + return locksTypeResult.getValue(); +} + +StatusWith<LocksType> DistLockCatalogImpl::overtakeLock(OperationContext* opCtx, + StringData lockID, + const OID& lockSessionID, + const OID& currentHolderTS, + StringData who, + StringData processId, + Date_t time, + StringData why) { + BSONArrayBuilder orQueryBuilder; + orQueryBuilder.append( + BSON(LocksType::name() << lockID << LocksType::state(LocksType::UNLOCKED))); + orQueryBuilder.append(BSON(LocksType::name() << lockID << LocksType::lockID(currentHolderTS))); + + BSONObj newLockDetails(BSON( + LocksType::lockID(lockSessionID) << LocksType::state(LocksType::LOCKED) << LocksType::who() + << who + << LocksType::process() + << processId + << LocksType::when(time) + << LocksType::why() + << why)); + + auto request = FindAndModifyRequest::makeUpdate( + _locksNS, BSON("$or" << orQueryBuilder.arr()), BSON("$set" << newLockDetails)); + request.setShouldReturnNew(true); + request.setWriteConcern(kMajorityWriteConcern); + + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + _locksNS.db().toString(), + request.toBSON(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kNotIdempotent); + + auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus)); + if (!findAndModifyStatus.isOK()) { + return findAndModifyStatus.getStatus(); + } + + BSONObj doc = findAndModifyStatus.getValue(); + auto locksTypeResult = LocksType::fromBSON(doc); + if (!locksTypeResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "failed to parse: " << doc << " : " + << locksTypeResult.getStatus().toString()}; + } + + return locksTypeResult.getValue(); +} + +Status DistLockCatalogImpl::unlock(OperationContext* opCtx, const OID& lockSessionID) { + FindAndModifyRequest request = FindAndModifyRequest::makeUpdate( + _locksNS, + BSON(LocksType::lockID(lockSessionID)), + BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); + request.setWriteConcern(kMajorityWriteConcern); + return _unlock(opCtx, request); +} + +Status DistLockCatalogImpl::unlock(OperationContext* opCtx, + const OID& lockSessionID, + StringData name) { + FindAndModifyRequest request = FindAndModifyRequest::makeUpdate( + _locksNS, + BSON(LocksType::lockID(lockSessionID) << LocksType::name(name.toString())), + BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); + request.setWriteConcern(kMajorityWriteConcern); + return _unlock(opCtx, request); +} + +Status DistLockCatalogImpl::_unlock(OperationContext* opCtx, const FindAndModifyRequest& request) { + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + _locksNS.db().toString(), + request.toBSON(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus)); + if (findAndModifyStatus == ErrorCodes::LockStateChangeFailed) { + // Did not modify any document, which implies that the lock already has a + // a different owner. This is ok since it means that the objective of + // releasing ownership of the lock has already been accomplished. + return Status::OK(); + } + + return findAndModifyStatus.getStatus(); +} + +Status DistLockCatalogImpl::unlockAll(OperationContext* opCtx, const std::string& processID) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(_locksNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(LocksType::process(processID))); + entry.setU(BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED)))); + entry.setUpsert(false); + entry.setMulti(true); + return entry; + }()}); + return updateOp; + }()); + request.setWriteConcern(kLocalWriteConcern.toBSON()); + + BSONObj cmdObj = request.toBSON(); + + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto response = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + _locksNS.db().toString(), + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + if (!response.isOK()) { + return response.getStatus(); + } + if (!response.getValue().commandStatus.isOK()) { + return response.getValue().commandStatus; + } + if (!response.getValue().writeConcernStatus.isOK()) { + return response.getValue().writeConcernStatus; + } + + BatchedCommandResponse batchResponse; + std::string errmsg; + if (!batchResponse.parseBSON(response.getValue().response, &errmsg)) { + return Status(ErrorCodes::FailedToParse, + str::stream() + << "Failed to parse config server response to batch request for " + "unlocking existing distributed locks" + << causedBy(errmsg)); + } + return batchResponse.toStatus(); +} + +StatusWith<DistLockCatalog::ServerInfo> DistLockCatalogImpl::getServerInfo( + OperationContext* opCtx) { + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + kReadPref, + "admin", + BSON("serverStatus" << 1), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + if (!resultStatus.isOK()) { + return resultStatus.getStatus(); + } + if (!resultStatus.getValue().commandStatus.isOK()) { + return resultStatus.getValue().commandStatus; + } + + BSONObj responseObj(std::move(resultStatus.getValue().response)); + + BSONElement localTimeElem; + auto localTimeStatus = + bsonExtractTypedField(responseObj, kLocalTimeField, Date, &localTimeElem); + + if (!localTimeStatus.isOK()) { + return {ErrorCodes::UnsupportedFormat, localTimeStatus.reason()}; + } + + auto electionIdStatus = extractElectionId(responseObj); + + if (!electionIdStatus.isOK()) { + return electionIdStatus.getStatus(); + } + + return DistLockCatalog::ServerInfo(localTimeElem.date(), electionIdStatus.getValue()); +} + +StatusWith<LocksType> DistLockCatalogImpl::getLockByTS(OperationContext* opCtx, + const OID& lockSessionID) { + auto findResult = _findOnConfig( + opCtx, kReadPref, _locksNS, BSON(LocksType::lockID(lockSessionID)), BSONObj(), 1); + + if (!findResult.isOK()) { + return findResult.getStatus(); + } + + const auto& findResultSet = findResult.getValue(); + + if (findResultSet.empty()) { + return {ErrorCodes::LockNotFound, + str::stream() << "lock with ts " << lockSessionID << " not found"}; + } + + BSONObj doc = findResultSet.front(); + auto locksTypeResult = LocksType::fromBSON(doc); + if (!locksTypeResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "failed to parse: " << doc << " : " + << locksTypeResult.getStatus().toString()}; + } + + return locksTypeResult.getValue(); +} + +StatusWith<LocksType> DistLockCatalogImpl::getLockByName(OperationContext* opCtx, StringData name) { + auto findResult = + _findOnConfig(opCtx, kReadPref, _locksNS, BSON(LocksType::name() << name), BSONObj(), 1); + + if (!findResult.isOK()) { + return findResult.getStatus(); + } + + const auto& findResultSet = findResult.getValue(); + + if (findResultSet.empty()) { + return {ErrorCodes::LockNotFound, + str::stream() << "lock with name " << name << " not found"}; + } + + BSONObj doc = findResultSet.front(); + auto locksTypeResult = LocksType::fromBSON(doc); + if (!locksTypeResult.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "failed to parse: " << doc << " : " + << locksTypeResult.getStatus().toString()}; + } + + return locksTypeResult.getValue(); +} + +Status DistLockCatalogImpl::stopPing(OperationContext* opCtx, StringData processId) { + auto request = + FindAndModifyRequest::makeRemove(_lockPingNS, BSON(LockpingsType::process() << processId)); + request.setWriteConcern(kMajorityWriteConcern); + + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto resultStatus = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + _locksNS.db().toString(), + request.toBSON(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kNotIdempotent); + + auto findAndModifyStatus = extractFindAndModifyNewObj(std::move(resultStatus)); + return findAndModifyStatus.getStatus(); +} + +StatusWith<vector<BSONObj>> DistLockCatalogImpl::_findOnConfig( + OperationContext* opCtx, + const ReadPreferenceSetting& readPref, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit) { + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto result = shardRegistry->getConfigShard()->exhaustiveFindOnConfig( + opCtx, readPref, repl::ReadConcernLevel::kMajorityReadConcern, nss, query, sort, limit); + if (!result.isOK()) { + return result.getStatus(); + } + + return result.getValue().docs; +} + +} // namespace mongo |