/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/db/s/shard_metadata_util.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/write_concern_options.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/unique_message.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_shard_collection.h" #include "mongo/s/chunk_version.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { namespace shardmetadatautil { namespace { const WriteConcernOptions kLocalWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Milliseconds(0)); /** * Processes a command result for errors, including write concern errors. */ Status getStatusFromWriteCommandResponse(const BSONObj& commandResult) { BatchedCommandResponse batchResponse; std::string errmsg; if (!batchResponse.parseBSON(commandResult, &errmsg)) { return Status(ErrorCodes::FailedToParse, str::stream() << "Failed to parse write response: " << errmsg); } return batchResponse.toStatus(); } } // namespace QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion) { return {BSON(ChunkType::lastmod() << BSON("$gte" << Timestamp(collectionVersion.toLong()))), BSON(ChunkType::lastmod() << 1)}; } bool RefreshState::operator==(const RefreshState& other) const { return (other.epoch == epoch) && (other.refreshing == refreshing) && (other.lastRefreshedCollectionVersion == lastRefreshedCollectionVersion); } std::string RefreshState::toString() const { return str::stream() << "epoch: " << epoch << ", refreshing: " << (refreshing ? "true" : "false") << ", lastRefreshedCollectionVersion: " << lastRefreshedCollectionVersion.toString(); } Status setPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString& nss) { // Set 'refreshing' to true. BSONObj update = BSON(ShardCollectionType::refreshing() << true); return updateShardCollectionsEntry( opCtx, BSON(ShardCollectionType::ns() << nss.ns()), update, false /*upsert*/); } Status unsetPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& refreshedVersion) { // Set 'refreshing' to false and update the last refreshed collection version. BSONObjBuilder updateBuilder; updateBuilder.append(ShardCollectionType::refreshing(), false); updateBuilder.appendTimestamp(ShardCollectionType::lastRefreshedCollectionVersion(), refreshedVersion.toLong()); return updateShardCollectionsEntry( opCtx, BSON(ShardCollectionType::ns() << nss.ns()), updateBuilder.obj(), false /*upsert*/); } StatusWith getPersistedRefreshFlags(OperationContext* opCtx, const NamespaceString& nss) { auto statusWithCollectionEntry = readShardCollectionsEntry(opCtx, nss); if (!statusWithCollectionEntry.isOK()) { return statusWithCollectionEntry.getStatus(); } ShardCollectionType entry = statusWithCollectionEntry.getValue(); // Ensure the results have not been incorrectly set somehow. if (entry.hasRefreshing()) { // If 'refreshing' is present and false, a refresh must have occurred (otherwise the field // would never have been added to the document) and there should always be a refresh // version. invariant(entry.getRefreshing() ? true : entry.hasLastRefreshedCollectionVersion()); } else { // If 'refreshing' is not present, no refresh version should exist. invariant(!entry.hasLastRefreshedCollectionVersion()); } return RefreshState{entry.getEpoch(), // If the refreshing field has not yet been added, this means that the first // refresh has started, but no chunks have ever yet been applied, around // which these flags are set. So default to refreshing true because the // chunk metadata is being updated and is not yet ready to be read. entry.hasRefreshing() ? entry.getRefreshing() : true, entry.hasLastRefreshedCollectionVersion() ? entry.getLastRefreshedCollectionVersion() : ChunkVersion(0, 0, entry.getEpoch())}; } StatusWith readShardCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss) { Query fullQuery(BSON(ShardCollectionType::ns() << nss.ns())); try { DBDirectClient client(opCtx); std::unique_ptr cursor = client.query(ShardCollectionType::ConfigNS.c_str(), fullQuery, 1); if (!cursor) { return Status(ErrorCodes::OperationFailed, str::stream() << "Failed to establish a cursor for reading " << ShardCollectionType::ConfigNS << " from local storage"); } if (!cursor->more()) { // The collection has been dropped. return Status(ErrorCodes::NamespaceNotFound, str::stream() << "collection " << nss.ns() << " not found"); } BSONObj document = cursor->nextSafe(); auto statusWithCollectionEntry = ShardCollectionType::fromBSON(document); if (!statusWithCollectionEntry.isOK()) { return statusWithCollectionEntry.getStatus(); } return statusWithCollectionEntry.getValue(); } catch (const DBException& ex) { return {ex.toStatus().code(), str::stream() << "Failed to read the '" << nss.ns() << "' entry locally from config.collections" << causedBy(ex.toStatus())}; } } Status updateShardCollectionsEntry(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, const bool upsert) { invariant(query.hasField("_id")); if (upsert) { // If upserting, this should be an update from the config server that does not have shard // refresh information. invariant(!update.hasField(ShardCollectionType::refreshing())); invariant(!update.hasField(ShardCollectionType::lastRefreshedCollectionVersion())); } try { DBDirectClient client(opCtx); auto commandResponse = client.runCommand([&] { write_ops::Update updateOp(NamespaceString{ShardCollectionType::ConfigNS}); updateOp.setUpdates({[&] { write_ops::UpdateOpEntry entry; entry.setQ(query); // Want to modify the document, not replace it entry.setU(BSON("$set" << update)); entry.setUpsert(upsert); return entry; }()}); return updateOp.serialize({}); }()); uassertStatusOK(getStatusFromWriteCommandResponse(commandResponse->getCommandReply())); return Status::OK(); } catch (const DBException& ex) { return ex.toStatus(); } } StatusWith> readShardChunks(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, const BSONObj& sort, boost::optional limit, const OID& epoch) { try { Query fullQuery(query); fullQuery.sort(sort); DBDirectClient client(opCtx); const std::string chunkMetadataNs = ChunkType::ShardNSPrefix + nss.ns(); std::unique_ptr cursor = client.query(chunkMetadataNs, fullQuery, limit.get_value_or(0)); uassert(ErrorCodes::OperationFailed, str::stream() << "Failed to establish a cursor for reading " << chunkMetadataNs << " from local storage", cursor); std::vector chunks; while (cursor->more()) { BSONObj document = cursor->nextSafe().getOwned(); auto statusWithChunk = ChunkType::fromShardBSON(document, epoch); if (!statusWithChunk.isOK()) { return {statusWithChunk.getStatus().code(), str::stream() << "Failed to parse chunk '" << document.toString() << "' due to " << statusWithChunk.getStatus().reason()}; } chunks.push_back(std::move(statusWithChunk.getValue())); } return chunks; } catch (const DBException& ex) { return ex.toStatus(); } } Status updateShardChunks(OperationContext* opCtx, const NamespaceString& nss, const std::vector& chunks, const OID& currEpoch) { invariant(!chunks.empty()); NamespaceString chunkMetadataNss(ChunkType::ShardNSPrefix + nss.ns()); try { DBDirectClient client(opCtx); // This may be the first update, so the first opportunity to create an index. // If the index already exists, this is a no-op. client.createIndex(chunkMetadataNss.ns(), BSON(ChunkType::lastmod() << 1)); /** * Here are examples of the operations that can happen on the config server to update * the config.chunks collection. 'chunks' only includes the chunks that result from the * operations, which can be read from the config server, not any that were removed, so * we must delete any chunks that overlap with the new 'chunks'. * * CollectionVersion = 10.3 * * moveChunk * {_id: 3, max: 5, version: 10.1} --> {_id: 3, max: 5, version: 11.0} * * splitChunk * {_id: 3, max: 9, version 10.3} --> {_id: 3, max: 5, version 10.4} * {_id: 5, max: 8, version 10.5} * {_id: 8, max: 9, version 10.6} * * mergeChunk * {_id: 10, max: 14, version 4.3} --> {_id: 10, max: 22, version 10.4} * {_id: 14, max: 19, version 7.1} * {_id: 19, max: 22, version 2.0} * */ for (auto& chunk : chunks) { invariant(chunk.getVersion().hasEqualEpoch(currEpoch)); // Delete any overlapping chunk ranges. Overlapping chunks will have a min value // ("_id") between (chunk.min, chunk.max]. // // query: { "_id" : {"$gte": chunk.min, "$lt": chunk.max}} auto deleteCommandResponse = client.runCommand([&] { write_ops::Delete deleteOp(chunkMetadataNss); deleteOp.setDeletes({[&] { write_ops::DeleteOpEntry entry; entry.setQ(BSON(ChunkType::minShardID << BSON("$gte" << chunk.getMin() << "$lt" << chunk.getMax()))); entry.setMulti(true); return entry; }()}); return deleteOp.serialize({}); }()); uassertStatusOK( getStatusFromWriteCommandResponse(deleteCommandResponse->getCommandReply())); // Now the document can be expected to cleanly insert without overlap auto insertCommandResponse = client.runCommand([&] { write_ops::Insert insertOp(chunkMetadataNss); insertOp.setDocuments({chunk.toShardBSON()}); return insertOp.serialize({}); }()); uassertStatusOK( getStatusFromWriteCommandResponse(insertCommandResponse->getCommandReply())); } return Status::OK(); } catch (const DBException& ex) { return ex.toStatus(); } } Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss) { try { DBDirectClient client(opCtx); auto deleteCommandResponse = client.runCommand([&] { write_ops::Delete deleteOp( NamespaceString{NamespaceString::kShardConfigCollectionsCollectionName}); deleteOp.setDeletes({[&] { write_ops::DeleteOpEntry entry; entry.setQ(BSON(ShardCollectionType::ns << nss.ns())); entry.setMulti(true); return entry; }()}); return deleteOp.serialize({}); }()); uassertStatusOK( getStatusFromWriteCommandResponse(deleteCommandResponse->getCommandReply())); // Drop the corresponding config.chunks.ns collection BSONObj result; if (!client.dropCollection( ChunkType::ShardNSPrefix + nss.ns(), kLocalWriteConcern, &result)) { Status status = getStatusFromCommandResult(result); if (status != ErrorCodes::NamespaceNotFound) { uassertStatusOK(status); } } LOG(1) << "Successfully cleared persisted chunk metadata for collection '" << nss << "'."; return Status::OK(); } catch (const DBException& ex) { return ex.toStatus(); } } } // namespace shardmetadatautil } // namespace mongo