diff options
25 files changed, 520 insertions, 301 deletions
diff --git a/src/mongo/db/repl/optime_pair.h b/src/mongo/db/repl/optime_pair.h new file mode 100644 index 00000000000..0f301784f7e --- /dev/null +++ b/src/mongo/db/repl/optime_pair.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/optime.h" + +namespace mongo { +namespace repl { + +template <typename T> +struct OpTimePair { +public: + OpTimePair() = default; + explicit OpTimePair(T val) : value(std::move(val)) {} + OpTimePair(T val, OpTime ts) : value(std::move(val)), opTime(std::move(ts)) {} + + T value; + OpTime opTime; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index f95bc3cef7a..03b96373844 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -149,15 +149,11 @@ public: } // step 2 - if (!ChunkVersion::canParseBSON(cmdObj, "version")) { - errmsg = "need to specify version"; - return false; - } - - const ChunkVersion version = ChunkVersion::fromBSON(cmdObj, "version"); + ChunkVersionAndOpTime verAndOpTime = + uassertStatusOK(ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj)); + const auto& version = verAndOpTime.getVersion(); // step 3 - const ChunkVersion oldVersion = info->getVersion(ns); const ChunkVersion globalVersion = ShardingState::get(txn)->getVersion(ns); diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index df9b3cdef5d..72ddfe1549f 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -33,11 +33,13 @@ env.Library( target='common', source=[ 'chunk_diff.cpp', + 'chunk_version.cpp', 'set_shard_version_request.cpp', ], LIBDEPS=[ 'catalog/catalog_types', '$BUILD_DIR/mongo/client/connection_string', + '$BUILD_DIR/mongo/db/repl/optime', ] ) diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 73d38b1e610..957f799ba44 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -607,7 +607,7 @@ Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const Namespa shardEntry.getName(), fassertStatusOK(28753, ConnectionString::parse(shardEntry.getHost())), ns, - ChunkVersion::DROPPED(), + ChunkVersionAndOpTime(ChunkVersion::DROPPED()), true); auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index dcc3e2d68e3..bc8ee282ef3 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -212,7 +212,7 @@ Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, dbPrimaryShardId, primaryShard->getConnString(), NamespaceString(ns), - manager->getVersion(), + ChunkVersionAndOpTime(manager->getVersion(), manager->getConfigOpTime()), true); auto ssvStatus = grid.shardRegistry()->runCommandWithNotMasterRetries( @@ -518,13 +518,18 @@ Status CatalogManagerReplicaSet::dropCollection(OperationContext* txn, const Nam LOG(1) << "dropCollection " << ns << " collection marked as dropped"; + // We just called updateCollection above and this would have advanced the config op time, so use + // the latest value. On the MongoD side, we need to load the latest config metadata, which + // indicates that the collection was dropped. + const ChunkVersionAndOpTime droppedVersion(ChunkVersion::DROPPED(), _getConfigOpTime()); + for (const auto& shardEntry : allShards) { SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( grid.shardRegistry()->getConfigServerConnectionString(), shardEntry.getName(), fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), ns, - ChunkVersion::DROPPED(), + droppedVersion, true); auto ssvResult = shardRegistry->runCommandWithNotMasterRetries( diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 638935ddbf5..74fabae55a9 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -172,6 +172,10 @@ ChunkManager::ChunkManager(const CollectionType& coll) _version = ChunkVersion::fromBSON(coll.toBSON()); } +repl::OpTime ChunkManager::getConfigOpTime() const { + return repl::OpTime(Timestamp(0, 0), 0); +} + void ChunkManager::loadExistingRanges(OperationContext* txn, const ChunkManager* oldManager) { int tries = 3; diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 0ca94237920..23440fced18 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -44,6 +44,10 @@ class CollectionType; struct QuerySolutionNode; class OperationContext; +namespace repl { +class OpTime; +} + typedef std::shared_ptr<ChunkManager> ChunkManagerPtr; // The key for the map is max for each Chunk or ChunkRange @@ -154,6 +158,11 @@ public: return _sequenceNumber; } + /** + * Returns the latest op time from when this chunk manager's data was loaded. + */ + repl::OpTime getConfigOpTime() const; + // // After constructor is invoked, we need to call loadExistingRanges. If this is a new // sharded collection, we can call createFirstChunks first. diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index f4448dafe5d..a3de1e2235e 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -320,7 +320,8 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* txn, << "; no metadata found"); } - *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED()); + *endpoint = + new ShardEndpoint(_primary->getId(), ChunkVersionAndOpTime(ChunkVersion::UNSHARDED())); return Status::OK(); } } @@ -496,7 +497,10 @@ Status ChunkManagerTargeter::targetQuery(const BSONObj& query, for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( - shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); + shardId, + _manager + ? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime()) + : ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()))); } return Status::OK(); @@ -516,7 +520,9 @@ Status ChunkManagerTargeter::targetShardKey(OperationContext* txn, _stats.chunkSizeDelta[chunk->getMin()] += estDataSize; } - *endpoint = new ShardEndpoint(chunk->getShardId(), _manager->getVersion(chunk->getShardId())); + *endpoint = new ShardEndpoint(chunk->getShardId(), + ChunkVersionAndOpTime(_manager->getVersion(chunk->getShardId()), + _manager->getConfigOpTime())); return Status::OK(); } @@ -537,7 +543,10 @@ Status ChunkManagerTargeter::targetCollection(vector<ShardEndpoint*>* endpoints) for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( - shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); + shardId, + _manager + ? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime()) + : ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()))); } return Status::OK(); @@ -555,7 +564,10 @@ Status ChunkManagerTargeter::targetAllShards(vector<ShardEndpoint*>* endpoints) for (const ShardId& shardId : shardIds) { endpoints->push_back(new ShardEndpoint( - shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED())); + shardId, + _manager + ? ChunkVersionAndOpTime(_manager->getVersion(shardId), _manager->getConfigOpTime()) + : ChunkVersionAndOpTime(ChunkVersion::UNSHARDED()))); } return Status::OK(); diff --git a/src/mongo/s/chunk_version.cpp b/src/mongo/s/chunk_version.cpp new file mode 100644 index 00000000000..305875d4a6e --- /dev/null +++ b/src/mongo/s/chunk_version.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/chunk_version.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +const char kVersion[] = "version"; +const char kShardVersion[] = "shardVersion"; + +} // namespace + +StatusWith<ChunkVersion> ChunkVersion::parseFromBSONForCommands(const BSONObj& obj) { + BSONElement versionElem; + Status status = bsonExtractField(obj, kShardVersion, &versionElem); + if (!status.isOK()) + return status; + + if (versionElem.type() != Array) { + return {ErrorCodes::TypeMismatch, + str::stream() << "Invalid type " << versionElem.type() + << " for shardVersion element. Expected an array"}; + } + + BSONObjIterator it(versionElem.Obj()); + if (!it.more()) + return {ErrorCodes::BadValue, "Unexpected empty version"}; + + ChunkVersion version; + + // Expect the timestamp + { + BSONElement tsPart = it.next(); + if (tsPart.type() != bsonTimestamp) + return {ErrorCodes::TypeMismatch, + str::stream() << "Invalid type " << tsPart.type() + << " for version timestamp part."}; + + version._combined = tsPart.timestamp().asULL(); + } + + // Expect the epoch OID + { + BSONElement epochPart = it.next(); + if (epochPart.type() != jstOID) + return {ErrorCodes::TypeMismatch, + str::stream() << "Invalid type " << epochPart.type() + << " for version epoch part."}; + + version._epoch = epochPart.OID(); + } + + return version; +} + +StatusWith<ChunkVersion> ChunkVersion::parseFromBSONForSetShardVersion(const BSONObj& obj) { + bool canParse; + const ChunkVersion chunkVersion = ChunkVersion::fromBSON(obj, kVersion, &canParse); + if (!canParse) + return {ErrorCodes::BadValue, "Unable to parse shard version"}; + + return chunkVersion; +} + + +ChunkVersionAndOpTime::ChunkVersionAndOpTime(ChunkVersion chunkVersion) + : _verAndOpT(chunkVersion) {} + +ChunkVersionAndOpTime::ChunkVersionAndOpTime(ChunkVersion chunkVersion, repl::OpTime ts) + : _verAndOpT(chunkVersion, ts) {} + +StatusWith<ChunkVersionAndOpTime> ChunkVersionAndOpTime::parseFromBSONForCommands( + const BSONObj& obj) { + const auto chunkVersionStatus = ChunkVersion::parseFromBSONForCommands(obj); + if (!chunkVersionStatus.isOK()) + return chunkVersionStatus.getStatus(); + + const ChunkVersion& chunkVersion = chunkVersionStatus.getValue(); + + const auto opTimeStatus = repl::OpTime::parseFromBSON(obj); + if (opTimeStatus.isOK()) { + return ChunkVersionAndOpTime(chunkVersion, opTimeStatus.getValue()); + } else if (opTimeStatus == ErrorCodes::NoSuchKey) { + return ChunkVersionAndOpTime(chunkVersion); + } + + return opTimeStatus.getStatus(); +} + +StatusWith<ChunkVersionAndOpTime> ChunkVersionAndOpTime::parseFromBSONForSetShardVersion( + const BSONObj& obj) { + const auto chunkVersionStatus = ChunkVersion::parseFromBSONForSetShardVersion(obj); + if (!chunkVersionStatus.isOK()) + return chunkVersionStatus.getStatus(); + + const ChunkVersion& chunkVersion = chunkVersionStatus.getValue(); + + const auto opTimeStatus = repl::OpTime::parseFromBSON(obj); + if (opTimeStatus.isOK()) { + return ChunkVersionAndOpTime(chunkVersion, opTimeStatus.getValue()); + } else if (opTimeStatus == ErrorCodes::NoSuchKey) { + return ChunkVersionAndOpTime(chunkVersion); + } + + return opTimeStatus.getStatus(); +} + +void ChunkVersionAndOpTime::appendForSetShardVersion(BSONObjBuilder* builder) const { + _verAndOpT.value.addToBSON(*builder, kVersion); + _verAndOpT.opTime.append(builder); +} + +void ChunkVersionAndOpTime::appendForCommands(BSONObjBuilder* builder) const { + builder->appendArray(kShardVersion, _verAndOpT.value.toBSON()); + _verAndOpT.opTime.append(builder); +} + +} // namespace mongo diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h index 02a05f3dd57..7c4b2655412 100644 --- a/src/mongo/s/chunk_version.h +++ b/src/mongo/s/chunk_version.h @@ -1,37 +1,42 @@ /** -* Copyright (C) 2012 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects -* for all of the code used other than as permitted herein. If you modify -* file(s) with this exception, you may extend this exception to your -* version of the file(s), but you are not obligated to do so. If you do not -* wish to do so, delete this exception statement from your version. If you -* delete this exception statement from all source files in the program, -* then also delete it in the license file. -*/ + * Copyright (C) 2012-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #pragma once #include "mongo/db/jsobj.h" +#include "mongo/db/repl/optime_pair.h" namespace mongo { +class BSONObj; +template <typename T> +class StatusWith; + /** * ChunkVersions consist of a major/minor version scoped to a version epoch * @@ -46,32 +51,38 @@ namespace mongo { * expected from types. */ struct ChunkVersion { - union { - struct { - int _minor; - int _major; - }; - unsigned long long _combined; - }; - OID _epoch; - +public: ChunkVersion() : _minor(0), _major(0), _epoch(OID()) {} - // - // Constructors shouldn't have default parameters here, since it's vital we track from - // here on the epochs of versions, even if not used. - // - ChunkVersion(int major, int minor, const OID& epoch) : _minor(minor), _major(major), _epoch(epoch) {} + /** + * Interprets the specified BSON content as the format for commands, which is in the form: + * { ..., shardVersion: [ <combined major/minor>, <OID epoch> ], ... } + */ + static StatusWith<ChunkVersion> parseFromBSONForCommands(const BSONObj& obj); + + /** + * Interprets the specified BSON content as the format for the setShardVersion command, which + * is in the form: + * { ..., version: [ <combined major/minor> ], versionEpoch: [ <OID epoch> ], ... } + */ + static StatusWith<ChunkVersion> parseFromBSONForSetShardVersion(const BSONObj& obj); + + /** + * Indicates a dropped collection. All components are zeroes (OID is zero time, zero + * machineId/inc). + */ static ChunkVersion DROPPED() { - return ChunkVersion(0, 0, OID()); // dropped OID is zero time, zero machineId/inc + return ChunkVersion(0, 0, OID()); } + /** + * Indicates that the collection is not sharded. Same as DROPPED. + */ static ChunkVersion UNSHARDED() { - // TODO: Distinguish between these cases - return DROPPED(); + return ChunkVersion(0, 0, OID()); } static ChunkVersion IGNORED() { @@ -369,35 +380,23 @@ struct ChunkVersion { return b.arr(); } - bool parseBSON(const BSONObj& source, std::string* errMsg) { - // ChunkVersion wants to be an array. - BSONArray arrSource = static_cast<BSONArray>(source); - - bool canParse; - ChunkVersion version = fromBSON(arrSource, &canParse); - if (!canParse) { - *errMsg = "Could not parse version structure"; - return false; - } - - _minor = version._minor; - _major = version._major; - _epoch = version._epoch; - return true; - } - void clear() { _minor = 0; _major = 0; _epoch = OID(); } - void cloneTo(ChunkVersion* other) const { - other->clear(); - other->_minor = _minor; - other->_major = _major; - other->_epoch = _epoch; - } +private: + union { + struct { + int _minor; + int _major; + }; + + uint64_t _combined; + }; + + OID _epoch; }; inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) { @@ -405,4 +404,51 @@ inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) { return s; } + +/** + * Represents a chunk version along with the optime from when it was retrieved. Provides logic to + * serialize and deserialize the combo to BSON. + */ +class ChunkVersionAndOpTime { +public: + ChunkVersionAndOpTime(ChunkVersion chunkVersion); + ChunkVersionAndOpTime(ChunkVersion chunkVersion, repl::OpTime ts); + + const ChunkVersion& getVersion() const { + return _verAndOpT.value; + } + + const repl::OpTime& getOpTime() const { + return _verAndOpT.opTime; + } + + /** + * Interprets the contents of the BSON documents as having been constructed in the format for + * write commands. The optime component is optional for backwards compatibility and if not + * present, the optime will be default initialized. + */ + static StatusWith<ChunkVersionAndOpTime> parseFromBSONForCommands(const BSONObj& obj); + + /** + * Interprets the contents of the BSON document as having been constructed in the format for the + * setShardVersion command. The optime component is optional for backwards compatibility and if + * not present, the optime will be default initialized. + */ + static StatusWith<ChunkVersionAndOpTime> parseFromBSONForSetShardVersion(const BSONObj& obj); + + /** + * Appends the contents to the specified builder in the format expected by the setShardVersion + * command. + */ + void appendForSetShardVersion(BSONObjBuilder* builder) const; + + /** + * Appends the contents to the specified builder in the format expected by the write commands. + */ + void appendForCommands(BSONObjBuilder* builder) const; + +private: + repl::OpTimePair<ChunkVersion> _verAndOpT; +}; + } // namespace mongo diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index 44bc5be18c2..79c6d517445 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -211,8 +211,11 @@ private: inline void assertEndpointsEqual(const ShardEndpoint& endpointA, const ShardEndpoint& endpointB) { ASSERT_EQUALS(endpointA.shardName, endpointB.shardName); - ASSERT_EQUALS(endpointA.shardVersion.toLong(), endpointB.shardVersion.toLong()); - ASSERT_EQUALS(endpointA.shardVersion.epoch(), endpointB.shardVersion.epoch()); + ASSERT_EQUALS(endpointA.shardVersion.getVersion().toLong(), + endpointB.shardVersion.getVersion().toLong()); + ASSERT_EQUALS(endpointA.shardVersion.getVersion().epoch(), + endpointB.shardVersion.getVersion().epoch()); + ASSERT_EQUALS(endpointA.shardVersion.getOpTime(), endpointB.shardVersion.getOpTime()); } } // namespace mongo diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 087725697d2..1e6182b67b7 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -157,26 +157,11 @@ struct ShardEndpoint { ShardEndpoint(const ShardEndpoint& other) : shardName(other.shardName), shardVersion(other.shardVersion) {} - ShardEndpoint(const std::string& shardName, const ChunkVersion& shardVersion) + ShardEndpoint(const std::string& shardName, const ChunkVersionAndOpTime& shardVersion) : shardName(shardName), shardVersion(shardVersion) {} const std::string shardName; - const ChunkVersion shardVersion; - - // - // For testing *only* - do not use as part of API - // - - BSONObj toBSON() const { - BSONObjBuilder b; - appendBSON(&b); - return b.obj(); - } - - void appendBSON(BSONObjBuilder* builder) const { - builder->append("shardName", shardName); - shardVersion.addToBSON(*builder, "shardVersion"); - } + const ChunkVersionAndOpTime shardVersion; }; } // namespace mongo diff --git a/src/mongo/s/set_shard_version_request.cpp b/src/mongo/s/set_shard_version_request.cpp index eb843a963dd..15389a66d43 100644 --- a/src/mongo/s/set_shard_version_request.cpp +++ b/src/mongo/s/set_shard_version_request.cpp @@ -46,7 +46,6 @@ const char kShardName[] = "shard"; const char kShardConnectionString[] = "shardHost"; const char kInit[] = "init"; const char kAuthoritative[] = "authoritative"; -const char kVersion[] = "version"; const char kNoConnectionVersioning[] = "noConnectionVersioning"; } // namespace @@ -64,7 +63,7 @@ SetShardVersionRequest::SetShardVersionRequest(ConnectionString configServer, std::string shardName, ConnectionString shardConnectionString, NamespaceString nss, - ChunkVersion version, + ChunkVersionAndOpTime version, bool isAuthoritative) : _init(false), _isAuthoritative(isAuthoritative), @@ -88,7 +87,7 @@ SetShardVersionRequest SetShardVersionRequest::makeForVersioning( const std::string& shardName, const ConnectionString& shardConnectionString, const NamespaceString& nss, - const ChunkVersion& nssVersion, + const ChunkVersionAndOpTime& nssVersion, bool isAuthoritative) { return SetShardVersionRequest( configServer, shardName, shardConnectionString, nss, nssVersion, isAuthoritative); @@ -99,7 +98,7 @@ SetShardVersionRequest SetShardVersionRequest::makeForVersioningNoPersist( const std::string& shardName, const ConnectionString& shard, const NamespaceString& nss, - const ChunkVersion& nssVersion, + const ChunkVersionAndOpTime& nssVersion, bool isAuthoritative) { auto ssv = makeForVersioning(configServer, shardName, shard, nss, nssVersion, isAuthoritative); ssv._noConnectionVersioning = true; @@ -185,14 +184,11 @@ StatusWith<SetShardVersionRequest> SetShardVersionRequest::parseFromBSON(const B } { - bool canParse; + auto versionStatus = ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj); + if (!versionStatus.isOK()) + return versionStatus.getStatus(); - ChunkVersion chunkVersion = ChunkVersion::fromBSON(cmdObj, kVersion, &canParse); - if (!canParse) { - return {ErrorCodes::BadValue, "Unable to parse shard version"}; - } - - request._version = std::move(chunkVersion); + request._version = versionStatus.getValue(); } return request; @@ -209,7 +205,7 @@ BSONObj SetShardVersionRequest::toBSON() const { cmdBuilder.append(kShardConnectionString, _shardCS.toString()); if (!_init) { - _version.get().addToBSON(cmdBuilder, kVersion); + _version.get().appendForSetShardVersion(&cmdBuilder); } if (_noConnectionVersioning) { @@ -226,7 +222,7 @@ const NamespaceString& SetShardVersionRequest::getNS() const { const ChunkVersion SetShardVersionRequest::getNSVersion() const { invariant(!_init); - return _version.get(); + return _version.get().getVersion(); } } // namespace mongo diff --git a/src/mongo/s/set_shard_version_request.h b/src/mongo/s/set_shard_version_request.h index 02fcb3099dc..3f06b4546c8 100644 --- a/src/mongo/s/set_shard_version_request.h +++ b/src/mongo/s/set_shard_version_request.h @@ -38,7 +38,6 @@ namespace mongo { class BSONObj; -struct ChunkVersion; template <typename T> class StatusWith; @@ -67,7 +66,7 @@ public: const std::string& shardName, const ConnectionString& shard, const NamespaceString& nss, - const ChunkVersion& nssVersion, + const ChunkVersionAndOpTime& nssVersion, bool isAuthoritative); /** @@ -77,12 +76,13 @@ public: * connection WILL NOT be marked as "versioned". DO NOT USE unless the command will be sent * through the TaskExecutor. */ - static SetShardVersionRequest makeForVersioningNoPersist(const ConnectionString& configServer, - const std::string& shardName, - const ConnectionString& shard, - const NamespaceString& nss, - const ChunkVersion& nssVersion, - bool isAuthoritative); + static SetShardVersionRequest makeForVersioningNoPersist( + const ConnectionString& configServer, + const std::string& shardName, + const ConnectionString& shard, + const NamespaceString& nss, + const ChunkVersionAndOpTime& nssVersion, + bool isAuthoritative); /** * Parses an SSV request from a set shard version command. @@ -153,7 +153,7 @@ private: std::string shardName, ConnectionString shardConnectionString, NamespaceString nss, - ChunkVersion nssVersion, + ChunkVersionAndOpTime version, bool isAuthoritative); SetShardVersionRequest(); @@ -169,7 +169,7 @@ private: // These values are only set if _init is false boost::optional<NamespaceString> _nss; - boost::optional<ChunkVersion> _version; + boost::optional<ChunkVersionAndOpTime> _version; }; } // namespace mongo diff --git a/src/mongo/s/set_shard_version_request_test.cpp b/src/mongo/s/set_shard_version_request_test.cpp index 6328a9763c2..ab370835f7a 100644 --- a/src/mongo/s/set_shard_version_request_test.cpp +++ b/src/mongo/s/set_shard_version_request_test.cpp @@ -215,7 +215,8 @@ TEST(SetShardVersionRequest, ToSSVCommandInit) { } TEST(SetShardVersionRequest, ToSSVCommandFull) { - const ChunkVersion chunkVersion(1, 2, OID::gen()); + const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()), + repl::OpTime(Timestamp(10), 20LL)); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning( configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, false); @@ -228,7 +229,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFull) { ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); ASSERT_EQ(ssv.getNS().ns(), "db.coll"); ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"), - chunkVersion.toBSONWithPrefix("version")); + chunkVersion.getVersion().toBSONWithPrefix("version")); ASSERT_EQ(ssv.toBSON(), BSON("setShardVersion" @@ -237,11 +238,13 @@ TEST(SetShardVersionRequest, ToSSVCommandFull) { << configCS.toString() << "shard" << "TestShard" << "shardHost" << shardCS.toString() << "version" - << Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch())); + << Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch" + << chunkVersion.getVersion().epoch() << "ts" << Timestamp(10) << "t" << 20LL)); } TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) { - const ChunkVersion chunkVersion(1, 2, OID::gen()); + const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()), + repl::OpTime(Timestamp(10), 20LL)); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning( configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, true); @@ -254,7 +257,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) { ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); ASSERT_EQ(ssv.getNS().ns(), "db.coll"); ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"), - chunkVersion.toBSONWithPrefix("version")); + chunkVersion.getVersion().toBSONWithPrefix("version")); ASSERT_EQ(ssv.toBSON(), BSON("setShardVersion" @@ -263,11 +266,13 @@ TEST(SetShardVersionRequest, ToSSVCommandFullAuthoritative) { << configCS.toString() << "shard" << "TestShard" << "shardHost" << shardCS.toString() << "version" - << Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch())); + << Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch" + << chunkVersion.getVersion().epoch() << "ts" << Timestamp(10) << "t" << 20LL)); } TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) { - const ChunkVersion chunkVersion(1, 2, OID::gen()); + const ChunkVersionAndOpTime chunkVersion(ChunkVersion(1, 2, OID::gen()), + repl::OpTime(Timestamp(10), 20LL)); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( configCS, "TestShard", shardCS, NamespaceString("db.coll"), chunkVersion, true); @@ -280,7 +285,7 @@ TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) { ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); ASSERT_EQ(ssv.getNS().ns(), "db.coll"); ASSERT_EQ(ssv.getNSVersion().toBSONWithPrefix("version"), - chunkVersion.toBSONWithPrefix("version")); + chunkVersion.getVersion().toBSONWithPrefix("version")); ASSERT_EQ(ssv.toBSON(), BSON("setShardVersion" @@ -289,7 +294,8 @@ TEST(SetShardVersionRequest, ToSSVCommandFullNoConnectionVersioning) { << configCS.toString() << "shard" << "TestShard" << "shardHost" << shardCS.toString() << "version" - << Timestamp(chunkVersion.toLong()) << "versionEpoch" << chunkVersion.epoch() + << Timestamp(chunkVersion.getVersion().toLong()) << "versionEpoch" + << chunkVersion.getVersion().epoch() << "ts" << Timestamp(10) << "t" << 20LL << "noConnectionVersioning" << true)); } diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index a77243627d5..4d4f87a3159 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -1,32 +1,30 @@ -// @file version_manager.cpp - /** -* Copyright (C) 2010 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects -* for all of the code used other than as permitted herein. If you modify -* file(s) with this exception, you may extend this exception to your -* version of the file(s), but you are not obligated to do so. If you do not -* wish to do so, delete this exception statement from your version. If you -* delete this exception statement from all source files in the program, -* then also delete it in the license file. -*/ + * Copyright (C) 2010-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding @@ -45,6 +43,7 @@ #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/mongos_options.h" +#include "mongo/s/set_shard_version_request.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -111,34 +110,35 @@ private: */ bool setShardVersion(DBClientBase& conn, const string& ns, - const string& configServerPrimary, + const ConnectionString& configServer, ChunkVersion version, ChunkManager* manager, bool authoritative, BSONObj& result) { - BSONObjBuilder cmdBuilder; - cmdBuilder.append("setShardVersion", ns); - cmdBuilder.append("configdb", configServerPrimary); - ShardId shardId; + ConnectionString shardCS; { const auto shard = grid.shardRegistry()->getShard(conn.getServerAddress()); shardId = shard->getId(); - cmdBuilder.append("shard", shardId); - cmdBuilder.append("shardHost", shard->getConnString().toString()); + shardCS = shard->getConnString(); } - if (ns.size() > 0) { - version.addToBSON(cmdBuilder); + BSONObj cmd; + + if (ns.empty()) { + SetShardVersionRequest ssv = + SetShardVersionRequest::makeForInit(configServer, shardId, shardCS); + cmd = ssv.toBSON(); } else { - cmdBuilder.append("init", true); - } + const ChunkVersionAndOpTime verAndOpT = manager + ? ChunkVersionAndOpTime(version, manager->getConfigOpTime()) + : ChunkVersionAndOpTime(version); - if (authoritative) { - cmdBuilder.appendBool("authoritative", 1); - } + SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioning( + configServer, shardId, shardCS, NamespaceString(ns), verAndOpT, authoritative); - BSONObj cmd = cmdBuilder.obj(); + cmd = ssv.toBSON(); + } LOG(1) << " setShardVersion " << shardId << " " << conn.getServerAddress() << " " << ns << " " << cmd @@ -214,7 +214,7 @@ bool initShardVersionEmptyNS(OperationContext* txn, DBClientBase* conn_in) { ok = setShardVersion(*conn, "", - grid.shardRegistry()->getConfigServerConnectionString().toString(), + grid.shardRegistry()->getConfigServerConnectionString(), ChunkVersion(), NULL, true, @@ -373,7 +373,7 @@ bool checkShardVersion(OperationContext* txn, BSONObj result; if (setShardVersion(*conn, ns, - grid.shardRegistry()->getConfigServerConnectionString().toString(), + grid.shardRegistry()->getConfigServerConnectionString(), version, manager.get(), authoritative, diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 113dd9133f8..ba7ddae8d95 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -87,11 +87,13 @@ static int compareEndpoints(const ShardEndpoint* endpointA, const ShardEndpoint* if (shardNameDiff != 0) return shardNameDiff; - long shardVersionDiff = endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); + long shardVersionDiff = endpointA->shardVersion.getVersion().toLong() - + endpointB->shardVersion.getVersion().toLong(); if (shardVersionDiff != 0) return shardVersionDiff; - int shardEpochDiff = endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()); + int shardEpochDiff = endpointA->shardVersion.getVersion().epoch().compare( + endpointB->shardVersion.getVersion().epoch()); return shardEpochDiff; } @@ -456,7 +458,8 @@ void BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& targetedBatch, unique_ptr<BatchedRequestMetadata> requestMetadata(new BatchedRequestMetadata()); requestMetadata->setShardName(targetedBatch.getEndpoint().shardName); - requestMetadata->setShardVersion(targetedBatch.getEndpoint().shardVersion); + requestMetadata->setShardVersion( + ChunkVersionAndOpTime(targetedBatch.getEndpoint().shardVersion)); requestMetadata->setSession(0); request->setMetadata(requestMetadata.release()); } diff --git a/src/mongo/s/write_ops/batched_delete_request_test.cpp b/src/mongo/s/write_ops/batched_delete_request_test.cpp index 87d883f1b0b..c8f71652599 100644 --- a/src/mongo/s/write_ops/batched_delete_request_test.cpp +++ b/src/mongo/s/write_ops/batched_delete_request_test.cpp @@ -46,32 +46,24 @@ TEST(BatchedDeleteRequest, Basic) { BSON(BatchedDeleteDocument::query(BSON("a" << 1)) << BatchedDeleteDocument::limit(1)) << BSON(BatchedDeleteDocument::query(BSON("b" << 1)) << BatchedDeleteDocument::limit(1))); - BSONObj writeConcernObj = BSON("w" << 1); - - // The BSON_ARRAY macro doesn't support Timestamps. - BSONArrayBuilder arrBuilder; - arrBuilder.append(Timestamp(1, 1)); - arrBuilder.append(OID::gen()); - BSONArray shardVersionArray = arrBuilder.arr(); - - BSONObj origDeleteRequestObj = - BSON(BatchedDeleteRequest::collName("test") - << BatchedDeleteRequest::deletes() << deleteArray - << BatchedDeleteRequest::writeConcern(writeConcernObj) - << BatchedDeleteRequest::ordered(true) << BatchedDeleteRequest::metadata() - << BSON(BatchedRequestMetadata::shardName("shard000") - << BatchedRequestMetadata::shardVersion() << shardVersionArray - << BatchedRequestMetadata::session(0))); + BSONObj origDeleteRequestObj = BSON( + BatchedDeleteRequest::collName("test") + << BatchedDeleteRequest::deletes() << deleteArray + << BatchedDeleteRequest::writeConcern(BSON("w" << 1)) << BatchedDeleteRequest::ordered(true) + << BatchedDeleteRequest::metadata() << BSON("shardName" + << "shard000" + << "shardVersion" + << BSON_ARRAY(Timestamp(1, 2) << OID::gen()) + << "ts" << Timestamp(3, 4) << "t" << 5 + << "session" << 0LL)); string errMsg; BatchedDeleteRequest request; - bool ok = request.parseBSON("foo", origDeleteRequestObj, &errMsg); - ASSERT_TRUE(ok); + ASSERT_TRUE(request.parseBSON("foo", origDeleteRequestObj, &errMsg)); ASSERT_EQ("foo.test", request.getNS().ns()); - BSONObj genDeleteRequestObj = request.toBSON(); - ASSERT_EQUALS(0, genDeleteRequestObj.woCompare(origDeleteRequestObj)); + ASSERT_EQUALS(origDeleteRequestObj, request.toBSON()); } } // namespace diff --git a/src/mongo/s/write_ops/batched_insert_request_test.cpp b/src/mongo/s/write_ops/batched_insert_request_test.cpp index e31503d431f..09e8e950c08 100644 --- a/src/mongo/s/write_ops/batched_insert_request_test.cpp +++ b/src/mongo/s/write_ops/batched_insert_request_test.cpp @@ -45,32 +45,24 @@ namespace { TEST(BatchedInsertRequest, Basic) { BSONArray insertArray = BSON_ARRAY(BSON("a" << 1) << BSON("b" << 1)); - BSONObj writeConcernObj = BSON("w" << 1); - - // The BSON_ARRAY macro doesn't support Timestamps. - BSONArrayBuilder arrBuilder; - arrBuilder.append(Timestamp(1, 1)); - arrBuilder.append(OID::gen()); - BSONArray shardVersionArray = arrBuilder.arr(); - - BSONObj origInsertRequestObj = - BSON(BatchedInsertRequest::collName("test") - << BatchedInsertRequest::documents() << insertArray - << BatchedInsertRequest::writeConcern(writeConcernObj) - << BatchedInsertRequest::ordered(true) << BatchedInsertRequest::metadata() - << BSON(BatchedRequestMetadata::shardName("shard0000") - << BatchedRequestMetadata::shardVersion() << shardVersionArray - << BatchedRequestMetadata::session(0))); + BSONObj origInsertRequestObj = BSON( + BatchedInsertRequest::collName("test") + << BatchedInsertRequest::documents() << insertArray + << BatchedInsertRequest::writeConcern(BSON("w" << 1)) << BatchedInsertRequest::ordered(true) + << BatchedInsertRequest::metadata() << BSON("shardName" + << "shard000" + << "shardVersion" + << BSON_ARRAY(Timestamp(1, 2) << OID::gen()) + << "ts" << Timestamp(3, 4) << "t" << 5 + << "session" << 0LL)); string errMsg; BatchedInsertRequest request; - bool ok = request.parseBSON("foo", origInsertRequestObj, &errMsg); - ASSERT_TRUE(ok); + ASSERT_TRUE(request.parseBSON("foo", origInsertRequestObj, &errMsg)); ASSERT_EQ("foo.test", request.getNS().ns()); - BSONObj genInsertRequestObj = request.toBSON(); - ASSERT_EQUALS(0, genInsertRequestObj.woCompare(origInsertRequestObj)); + ASSERT_EQUALS(origInsertRequestObj, request.toBSON()); } TEST(BatchedInsertRequest, GenIDAll) { diff --git a/src/mongo/s/write_ops/batched_request_metadata.cpp b/src/mongo/s/write_ops/batched_request_metadata.cpp index a2e21f99e4c..78ecefbb9df 100644 --- a/src/mongo/s/write_ops/batched_request_metadata.cpp +++ b/src/mongo/s/write_ops/batched_request_metadata.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/s/write_ops/batched_request_metadata.h" #include "mongo/db/field_parser.h" @@ -55,9 +57,8 @@ BSONObj BatchedRequestMetadata::toBSON() const { if (_isShardNameSet) metadataBuilder << shardName(_shardName); - if (_shardVersion.get()) { - // ChunkVersion wants to be an array. - metadataBuilder.append(shardVersion(), static_cast<BSONArray>(_shardVersion->toBSON())); + if (_shardVersion) { + _shardVersion.get().appendForCommands(&metadataBuilder); } if (_isSessionSet) @@ -80,12 +81,12 @@ bool BatchedRequestMetadata::parseBSON(const BSONObj& source, string* errMsg) { _isShardNameSet = fieldState == FieldParser::FIELD_SET; { - std::unique_ptr<ChunkVersion> tempChunkVersion(new ChunkVersion); - fieldState = FieldParser::extract(source, shardVersion, tempChunkVersion.get(), errMsg); - if (fieldState == FieldParser::FIELD_INVALID) + auto verAndOpTStatus = ChunkVersionAndOpTime::parseFromBSONForCommands(source); + if (!verAndOpTStatus.isOK()) { return false; - if (fieldState == FieldParser::FIELD_SET) - _shardVersion.swap(tempChunkVersion); + } + + _shardVersion = verAndOpTStatus.getValue(); } fieldState = FieldParser::extract(source, session, &_session, errMsg); @@ -113,10 +114,7 @@ string BatchedRequestMetadata::toString() const { void BatchedRequestMetadata::cloneTo(BatchedRequestMetadata* other) const { other->_shardName = _shardName; other->_isShardNameSet = _isShardNameSet; - - if (other->_shardVersion.get()) - _shardVersion->cloneTo(other->_shardVersion.get()); - + other->_shardVersion = _shardVersion; other->_session = _session; other->_isSessionSet = _isSessionSet; } @@ -126,36 +124,21 @@ void BatchedRequestMetadata::setShardName(StringData shardName) { _isShardNameSet = true; } -void BatchedRequestMetadata::unsetShardName() { - _isShardNameSet = false; -} - -bool BatchedRequestMetadata::isShardNameSet() const { - return _isShardNameSet; -} - const string& BatchedRequestMetadata::getShardName() const { dassert(_isShardNameSet); return _shardName; } -void BatchedRequestMetadata::setShardVersion(const ChunkVersion& shardVersion) { - unique_ptr<ChunkVersion> temp(new ChunkVersion); - shardVersion.cloneTo(temp.get()); - _shardVersion.reset(temp.release()); -} - -void BatchedRequestMetadata::unsetShardVersion() { - _shardVersion.reset(); +void BatchedRequestMetadata::setShardVersion(const ChunkVersionAndOpTime& shardVersion) { + _shardVersion = shardVersion; } bool BatchedRequestMetadata::isShardVersionSet() const { - return _shardVersion.get() != NULL; + return _shardVersion.is_initialized(); } const ChunkVersion& BatchedRequestMetadata::getShardVersion() const { - dassert(_shardVersion.get()); - return *_shardVersion; + return _shardVersion.get().getVersion(); } void BatchedRequestMetadata::setSession(long long session) { diff --git a/src/mongo/s/write_ops/batched_request_metadata.h b/src/mongo/s/write_ops/batched_request_metadata.h index 6e31cb60713..2b885e9018d 100644 --- a/src/mongo/s/write_ops/batched_request_metadata.h +++ b/src/mongo/s/write_ops/batched_request_metadata.h @@ -28,34 +28,33 @@ #pragma once +#include <boost/optional.hpp> #include <string> -#include "mongo/base/disallow_copying.h" #include "mongo/db/jsobj.h" #include "mongo/s/bson_serializable.h" #include "mongo/s/chunk_version.h" namespace mongo { -class BatchedRequestMetadata : public BSONSerializable { - MONGO_DISALLOW_COPYING(BatchedRequestMetadata); +class BatchedRequestMetadata { public: static const BSONField<std::string> shardName; static const BSONField<ChunkVersion> shardVersion; static const BSONField<long long> session; BatchedRequestMetadata(); - virtual ~BatchedRequestMetadata(); + ~BatchedRequestMetadata(); // // bson serializable interface implementation // - virtual bool isValid(std::string* errMsg) const; - virtual BSONObj toBSON() const; - virtual bool parseBSON(const BSONObj& source, std::string* errMsg); - virtual void clear(); - virtual std::string toString() const; + bool isValid(std::string* errMsg) const; + BSONObj toBSON() const; + bool parseBSON(const BSONObj& source, std::string* errMsg); + void clear(); + std::string toString() const; void cloneTo(BatchedRequestMetadata* other) const; @@ -64,14 +63,12 @@ public: // void setShardName(StringData shardName); - void unsetShardName(); - bool isShardNameSet() const; const std::string& getShardName() const; - void setShardVersion(const ChunkVersion& shardVersion); - void unsetShardVersion(); + void setShardVersion(const ChunkVersionAndOpTime& shardVersion); bool isShardVersionSet() const; const ChunkVersion& getShardVersion() const; + const repl::OpTime& getOpTime() const; void setSession(long long session); void unsetSession(); @@ -84,10 +81,11 @@ private: bool _isShardNameSet; // (O) version for this collection on a given shard - std::unique_ptr<ChunkVersion> _shardVersion; + boost::optional<ChunkVersionAndOpTime> _shardVersion; // (O) session number the inserts belong to long long _session; bool _isSessionSet; }; -} + +} // namespace mongo diff --git a/src/mongo/s/write_ops/batched_request_metadata_test.cpp b/src/mongo/s/write_ops/batched_request_metadata_test.cpp index f35bcee4ef9..978ee865045 100644 --- a/src/mongo/s/write_ops/batched_request_metadata_test.cpp +++ b/src/mongo/s/write_ops/batched_request_metadata_test.cpp @@ -40,22 +40,16 @@ using std::string; namespace { TEST(BatchedRequestMetadata, Basic) { - // The BSON_ARRAY macro doesn't support Timestamps. - BSONArrayBuilder arrBuilder; - arrBuilder.append(Timestamp(1, 1)); - arrBuilder.append(OID::gen()); - BSONArray shardVersionArray = arrBuilder.arr(); - - BSONObj metadataObj(BSON(BatchedRequestMetadata::shardName("shard0000") - << BatchedRequestMetadata::shardVersion() << shardVersionArray - << BatchedRequestMetadata::session(100))); + BSONObj metadataObj(BSON("shardName" + << "shard0000" + << "shardVersion" << BSON_ARRAY(Timestamp(1, 2) << OID::gen()) << "ts" + << Timestamp(3, 4) << "t" << 5 << "session" << 0LL)); string errMsg; BatchedRequestMetadata metadata; ASSERT_TRUE(metadata.parseBSON(metadataObj, &errMsg)); - BSONObj genMetadataObj = metadata.toBSON(); - ASSERT_EQUALS(metadataObj, genMetadataObj); + ASSERT_EQUALS(metadataObj, metadata.toBSON()); } } // namespace diff --git a/src/mongo/s/write_ops/batched_update_request_test.cpp b/src/mongo/s/write_ops/batched_update_request_test.cpp index 04994733537..af7bb5aadfb 100644 --- a/src/mongo/s/write_ops/batched_update_request_test.cpp +++ b/src/mongo/s/write_ops/batched_update_request_test.cpp @@ -50,32 +50,24 @@ TEST(BatchedUpdateRequest, Basic) { << BatchedUpdateDocument::updateExpr(BSON("$set" << BSON("b" << 2))) << BatchedUpdateDocument::multi(false) << BatchedUpdateDocument::upsert(false))); - BSONObj writeConcernObj = BSON("w" << 1); - - // The BSON_ARRAY macro doesn't support Timestamps. - BSONArrayBuilder arrBuilder; - arrBuilder.append(Timestamp(1, 1)); - arrBuilder.append(OID::gen()); - BSONArray shardVersionArray = arrBuilder.arr(); - - BSONObj origUpdateRequestObj = - BSON(BatchedUpdateRequest::collName("test") - << BatchedUpdateRequest::updates() << updateArray - << BatchedUpdateRequest::writeConcern(writeConcernObj) - << BatchedUpdateRequest::ordered(true) << BatchedUpdateRequest::metadata() - << BSON(BatchedRequestMetadata::shardName("shard0000") - << BatchedRequestMetadata::shardVersion() << shardVersionArray - << BatchedRequestMetadata::session(0))); + BSONObj origUpdateRequestObj = BSON( + BatchedUpdateRequest::collName("test") + << BatchedUpdateRequest::updates() << updateArray + << BatchedUpdateRequest::writeConcern(BSON("w" << 1)) << BatchedUpdateRequest::ordered(true) + << BatchedUpdateRequest::metadata() << BSON("shardName" + << "shard000" + << "shardVersion" + << BSON_ARRAY(Timestamp(1, 2) << OID::gen()) + << "ts" << Timestamp(3, 4) << "t" << 5 + << "session" << 0LL)); string errMsg; BatchedUpdateRequest request; - bool ok = request.parseBSON("foo", origUpdateRequestObj, &errMsg); - ASSERT_TRUE(ok); + ASSERT_TRUE(request.parseBSON("foo", origUpdateRequestObj, &errMsg)); ASSERT_EQ("foo.test", request.getNS().ns()); - BSONObj genUpdateRequestObj = request.toBSON(); - ASSERT_EQUALS(0, genUpdateRequestObj.woCompare(origUpdateRequestObj)); + ASSERT_EQUALS(origUpdateRequestObj, request.toBSON()); } } // namespace diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 25f9f13b3aa..8cd20fa4865 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -127,7 +127,8 @@ Status WriteOp::targetWrites(OperationContext* txn, if (endpoints.size() == 1u) { targetedWrites->push_back(new TargetedWrite(*endpoint, ref)); } else { - ShardEndpoint broadcastEndpoint(endpoint->shardName, ChunkVersion::IGNORED()); + ShardEndpoint broadcastEndpoint(endpoint->shardName, + ChunkVersionAndOpTime(ChunkVersion::IGNORED())); targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref)); } diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index e488c068e47..233aa824337 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -218,11 +218,11 @@ TEST(WriteOpTests, TargetMultiAllShards) { ASSERT_EQUALS(targeted.size(), 3u); sortByEndpoint(&targeted); ASSERT_EQUALS(targeted[0]->endpoint.shardName, endpointA.shardName); - ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion)); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion.getVersion())); ASSERT_EQUALS(targeted[1]->endpoint.shardName, endpointB.shardName); - ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion)); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion.getVersion())); ASSERT_EQUALS(targeted[2]->endpoint.shardName, endpointC.shardName); - ASSERT(ChunkVersion::isIgnoredVersion(targeted[2]->endpoint.shardVersion)); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[2]->endpoint.shardVersion.getVersion())); writeOp.noteWriteComplete(*targeted[0]); writeOp.noteWriteComplete(*targeted[1]); |