From c2cc425b9d2b23eead06ecbfd996375e47c81baa Mon Sep 17 00:00:00 2001 From: Kevin Pulo Date: Thu, 1 Nov 2018 21:17:00 +0000 Subject: SERVER-36411 include shard id/name in changelog/actionlog entries --- src/mongo/db/s/SConscript | 26 +++ src/mongo/db/s/balancer/balancer.cpp | 5 +- .../s/config/configsvr_drop_database_command.cpp | 15 +- .../sharding_catalog_manager_chunk_operations.cpp | 7 +- ...rding_catalog_manager_collection_operations.cpp | 39 ++-- .../sharding_catalog_manager_shard_operations.cpp | 7 +- src/mongo/db/s/migration_source_manager.cpp | 9 +- src/mongo/db/s/move_primary_source_manager.cpp | 9 +- src/mongo/db/s/move_timing_helper.cpp | 11 +- src/mongo/db/s/sharding_logging.cpp | 197 ++++++++++++++++++ src/mongo/db/s/sharding_logging.h | 110 ++++++++++ src/mongo/db/s/sharding_logging_test.cpp | 231 +++++++++++++++++++++ src/mongo/db/s/sharding_state_recovery.cpp | 13 +- src/mongo/db/s/shardsvr_shard_collection.cpp | 25 +-- 14 files changed, 639 insertions(+), 65 deletions(-) create mode 100644 src/mongo/db/s/sharding_logging.cpp create mode 100644 src/mongo/db/s/sharding_logging.h create mode 100644 src/mongo/db/s/sharding_logging_test.cpp (limited to 'src/mongo/db/s') diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index b91d82a2691..2e91314615d 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -82,6 +82,7 @@ env.Library( 'migration_types', 'sharding_api_d', 'sharding_catalog_manager', + 'sharding_logging', ], ) @@ -189,6 +190,7 @@ env.Library( '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', '$BUILD_DIR/mongo/s/client/sharding_client', '$BUILD_DIR/mongo/s/coreshard', + 'sharding_logging', ], ) @@ -426,3 +428,27 @@ env.CppUnitTest( '$BUILD_DIR/mongo/util/version_impl', ] ) + +env.Library( + target='sharding_logging', + source=[ + 'sharding_logging.cpp', + ], + LIBDEPS=[ + 'sharding_api_d', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', + ], +) + +env.CppUnitTest( + target='sharding_logging_test', + source=[ + 'sharding_logging_test.cpp', + ], + LIBDEPS=[ + 'sharding_runtime_d', + 'sharding_logging', + '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/s/sharding_router_test_fixture', + ] +) diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 7807503b473..f5570a52507 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -45,6 +45,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h" #include "mongo/db/s/balancer/cluster_statistics_impl.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog_cache.h" @@ -381,7 +382,7 @@ void Balancer::_mainThread() { roundDetails.setSucceeded(static_cast(candidateChunks.size()), _balancedLastTime); - shardingContext->catalogClient() + ShardingLogging::get(opCtx.get()) ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()) .transitional_ignore(); } @@ -401,7 +402,7 @@ void Balancer::_mainThread() { // This round failed, tell the world! roundDetails.setFailed(e.what()); - shardingContext->catalogClient() + ShardingLogging::get(opCtx.get()) ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()) .transitional_ignore(); diff --git a/src/mongo/db/s/config/configsvr_drop_database_command.cpp b/src/mongo/db/s/config/configsvr_drop_database_command.cpp index c775ad19e04..c9d49c974b9 100644 --- a/src/mongo/db/s/config/configsvr_drop_database_command.cpp +++ b/src/mongo/db/s/config/configsvr_drop_database_command.cpp @@ -36,6 +36,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache.h" @@ -137,12 +138,12 @@ public: // error. auto dbType = uassertStatusOK(dbInfo).value; - uassertStatusOK( - catalogClient->logChangeChecked(opCtx, - "dropDatabase.start", - dbname, - BSONObj(), - ShardingCatalogClient::kMajorityWriteConcern)); + uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( + opCtx, + "dropDatabase.start", + dbname, + BSONObj(), + ShardingCatalogClient::kMajorityWriteConcern)); // Drop the database's collections. for (const auto& nss : catalogClient->getAllShardedCollectionsForDb( @@ -174,7 +175,7 @@ public: uassertStatusOKWithContext( status, str::stream() << "Could not remove database '" << dbname << "' from metadata"); - catalogClient->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "dropDatabase", dbname, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); result.append("dropped", dbname); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 040d4cc03d0..a8635e61bb2 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -43,6 +43,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" @@ -456,7 +457,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "split", nss.ns(), logDetail.obj(), WriteConcernOptions()); } else { BSONObj beforeDetailObj = logDetail.obj(); @@ -470,7 +471,7 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, chunkDetail.append("of", newChunksSize); appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "multi-split", nss.ns(), chunkDetail.obj(), WriteConcernOptions()); } } @@ -583,7 +584,7 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx, collVersion.appendLegacyWithField(&logDetail, "prevShardVersion"); mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion"); - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions()); return Status::OK(); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 3d0eaddfa1d..b280c157b74 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -52,6 +52,7 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -166,17 +167,17 @@ void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) } Status ShardingCatalogManager::dropCollection(OperationContext* opCtx, const NamespaceString& nss) { - const auto catalogClient = Grid::get(opCtx)->catalogClient(); const Status logStatus = - catalogClient->logChangeChecked(opCtx, - "dropCollection.start", - nss.ns(), - BSONObj(), - ShardingCatalogClient::kMajorityWriteConcern); + ShardingLogging::get(opCtx)->logChangeChecked(opCtx, + "dropCollection.start", + nss.ns(), + BSONObj(), + ShardingCatalogClient::kMajorityWriteConcern); if (!logStatus.isOK()) { return logStatus; } + const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardsStatus = catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern); if (!shardsStatus.isOK()) { @@ -350,7 +351,7 @@ Status ShardingCatalogManager::dropCollection(OperationContext* opCtx, const Nam LOG(1) << "dropCollection " << nss.ns() << " completed"; - catalogClient->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "dropCollection", nss.ns(), BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); return Status::OK(); @@ -365,7 +366,6 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const vector& splitPoints, const bool distributeInitialChunks, const ShardId& dbPrimaryShardId) { - const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); @@ -383,12 +383,12 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, } collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast(splitPoints.size() + 1)); - uassertStatusOK( - catalogClient->logChangeChecked(opCtx, - "shardCollection.start", - nss.ns(), - collectionDetail.obj(), - ShardingCatalogClient::kMajorityWriteConcern)); + uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( + opCtx, + "shardCollection.start", + nss.ns(), + collectionDetail.obj(), + ShardingCatalogClient::kMajorityWriteConcern)); } // Construct the collection default collator. @@ -445,11 +445,12 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, << dbPrimaryShardId << causedBy(redact(status)); } - catalogClient->logChange(opCtx, - "shardCollection.end", - nss.ns(), - BSON("version" << initialChunks.collVersion().toString()), - ShardingCatalogClient::kMajorityWriteConcern); + ShardingLogging::get(opCtx)->logChange( + opCtx, + "shardCollection.end", + nss.ns(), + BSON("version" << initialChunks.collVersion().toString()), + ShardingCatalogClient::kMajorityWriteConcern); } void ShardingCatalogManager::generateUUIDsForExistingShardedCollections(OperationContext* opCtx) { diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp index ac56b8e9d14..76b1ba10087 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp @@ -57,6 +57,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/add_shard_cmd_gen.h" #include "mongo/db/s/add_shard_util.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/wire_version.h" #include "mongo/executor/task_executor.h" @@ -759,7 +760,7 @@ StatusWith ShardingCatalogManager::addShard( shardDetails.append("name", shardType.getName()); shardDetails.append("host", shardConnectionString.toString()); - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern); // Ensure the added shard is visible to this process. @@ -812,7 +813,7 @@ StatusWith ShardingCatalogManager::removeShard(OperationCon log() << "going to start draining shard: " << name; // Record start in changelog - const Status logStatus = Grid::get(opCtx)->catalogClient()->logChangeChecked( + const Status logStatus = ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "removeShard.start", "", @@ -884,7 +885,7 @@ StatusWith ShardingCatalogManager::removeShard(OperationCon shardRegistry->reload(opCtx); // Record finish in changelog - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "removeShard", "", BSON("shard" << name), ShardingCatalogClient::kLocalWriteConcern); return ShardDrainingStatus::COMPLETED; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 799f062f219..d30c52ac2b2 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -45,6 +45,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_metadata_util.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/s/sharding_statistics.h" @@ -225,7 +226,7 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); _stats.countDonorMoveChunkStarted.addAndFetch(1); - const Status logStatus = Grid::get(opCtx)->catalogClient()->logChangeChecked( + const Status logStatus = ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "moveChunk.start", getNss().ns(), @@ -446,7 +447,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC "against the config server to obtain its latest optime" << causedBy(redact(migrationCommitStatus)); - Status status = Grid::get(opCtx)->catalogClient()->logChangeChecked( + Status status = ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "moveChunk.validating", getNss().ns(), @@ -574,7 +575,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // scheduling orphan cleanup. _cleanup(opCtx); - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "moveChunk.commit", getNss().ns(), @@ -637,7 +638,7 @@ void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) { return; } - Grid::get(opCtx)->catalogClient()->logChange( + ShardingLogging::get(opCtx)->logChange( opCtx, "moveChunk.error", getNss().ns(), diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp index ef71c904fc4..90f1cc50cce 100644 --- a/src/mongo/db/s/move_primary_source_manager.cpp +++ b/src/mongo/db/s/move_primary_source_manager.cpp @@ -39,6 +39,7 @@ #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/s/shard_metadata_util.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -77,7 +78,7 @@ Status MovePrimarySourceManager::clone(OperationContext* opCtx) { log() << "Moving " << _dbname << " primary from: " << _fromShard << " to: " << _toShard; // Record start in changelog - uassertStatusOK(Grid::get(opCtx)->catalogClient()->logChangeChecked( + uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "movePrimary.start", _dbname.toString(), @@ -225,7 +226,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { "against the config server to obtain its latest optime" << causedBy(redact(commitStatus)); - Status validateStatus = Grid::get(opCtx)->catalogClient()->logChangeChecked( + Status validateStatus = ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "movePrimary.validating", getNss().ns(), @@ -287,7 +288,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { _cleanup(opCtx); - uassertStatusOK(Grid::get(opCtx)->catalogClient()->logChangeChecked( + uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "movePrimary.commit", _dbname.toString(), @@ -327,7 +328,7 @@ void MovePrimarySourceManager::cleanupOnError(OperationContext* opCtx) { } try { - uassertStatusOK(Grid::get(opCtx)->catalogClient()->logChangeChecked( + uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( opCtx, "movePrimary.error", _dbname.toString(), diff --git a/src/mongo/db/s/move_timing_helper.cpp b/src/mongo/db/s/move_timing_helper.cpp index 5a588c62c50..37bfa6247d3 100644 --- a/src/mongo/db/s/move_timing_helper.cpp +++ b/src/mongo/db/s/move_timing_helper.cpp @@ -36,6 +36,7 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -84,11 +85,11 @@ MoveTimingHelper::~MoveTimingHelper() { _b.append("errmsg", *_cmdErrmsg); } - Grid::get(_opCtx)->catalogClient()->logChange(_opCtx, - str::stream() << "moveChunk." << _where, - _ns, - _b.obj(), - ShardingCatalogClient::kMajorityWriteConcern); + ShardingLogging::get(_opCtx)->logChange(_opCtx, + str::stream() << "moveChunk." << _where, + _ns, + _b.obj(), + ShardingCatalogClient::kMajorityWriteConcern); } catch (const std::exception& e) { warning() << "couldn't record timing for moveChunk '" << _where << "': " << redact(e.what()); diff --git a/src/mongo/db/s/sharding_logging.cpp b/src/mongo/db/s/sharding_logging.cpp new file mode 100644 index 00000000000..3529a42cfbd --- /dev/null +++ b/src/mongo/db/s/sharding_logging.cpp @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_logging.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/server_options.h" +#include "mongo/executor/network_interface.h" +#include "mongo/s/catalog/type_changelog.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +const std::string kActionLogCollectionName("actionlog"); +const int kActionLogCollectionSizeMB = 20 * 1024 * 1024; + +const std::string kChangeLogCollectionName("changelog"); +const int kChangeLogCollectionSizeMB = 200 * 1024 * 1024; + +// Global ShardingLogging instance +const auto shardingLogging = ServiceContext::declareDecoration(); + +} // namespace + +ShardingLogging::ShardingLogging() = default; + +ShardingLogging::~ShardingLogging() = default; + +ShardingLogging* ShardingLogging::get(ServiceContext* serviceContext) { + return &shardingLogging(serviceContext); +} + +ShardingLogging* ShardingLogging::get(OperationContext* operationContext) { + return get(operationContext->getServiceContext()); +} + +Status ShardingLogging::logAction(OperationContext* opCtx, + const StringData what, + const StringData ns, + const BSONObj& detail) { + if (_actionLogCollectionCreated.load() == 0) { + Status result = _createCappedConfigCollection(opCtx, + kActionLogCollectionName, + kActionLogCollectionSizeMB, + ShardingCatalogClient::kMajorityWriteConcern); + if (result.isOK()) { + _actionLogCollectionCreated.store(1); + } else { + log() << "couldn't create config.actionlog collection:" << causedBy(result); + return result; + } + } + + return _log(opCtx, + kActionLogCollectionName, + what, + ns, + detail, + ShardingCatalogClient::kMajorityWriteConcern); +} + +Status ShardingLogging::logChangeChecked(OperationContext* opCtx, + const StringData what, + const StringData ns, + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + writeConcern.wMode == WriteConcernOptions::kMajority); + if (_changeLogCollectionCreated.load() == 0) { + Status result = _createCappedConfigCollection( + opCtx, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern); + if (result.isOK()) { + _changeLogCollectionCreated.store(1); + } else { + log() << "couldn't create config.changelog collection:" << causedBy(result); + return result; + } + } + + return _log(opCtx, kChangeLogCollectionName, what, ns, detail, writeConcern); +} + +Status ShardingLogging::_log(OperationContext* opCtx, + const StringData logCollName, + const StringData what, + const StringData operationNS, + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { + Date_t now = Grid::get(opCtx)->getNetwork()->now(); + const std::string serverName = str::stream() << Grid::get(opCtx)->getNetwork()->getHostName() + << ":" << serverGlobalParams.port; + const std::string changeId = str::stream() << serverName << "-" << now.toString() << "-" + << OID::gen(); + + ChangeLogType changeLog; + changeLog.setChangeId(changeId); + changeLog.setServer(serverName); + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + changeLog.setShard("config"); + } else { + auto shardingState = ShardingState::get(opCtx); + if (shardingState->enabled()) { + changeLog.setShard(shardingState->shardId().toString()); + } + } + changeLog.setClientAddr(opCtx->getClient()->clientAddress(true)); + changeLog.setTime(now); + changeLog.setNS(operationNS.toString()); + changeLog.setWhat(what.toString()); + changeLog.setDetails(detail); + + BSONObj changeLogBSON = changeLog.toBSON(); + log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON); + + const NamespaceString nss("config", logCollName); + Status result = Grid::get(opCtx)->catalogClient()->insertConfigDocument( + opCtx, nss, changeLogBSON, writeConcern); + + if (!result.isOK()) { + warning() << "Error encountered while logging config change with ID [" << changeId + << "] into collection " << logCollName << ": " << redact(result); + } + + return result; +} + +Status ShardingLogging::_createCappedConfigCollection(OperationContext* opCtx, + StringData collName, + int cappedSize, + const WriteConcernOptions& writeConcern) { + BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize + << WriteConcernOptions::kWriteConcernField + << writeConcern.toBSON()); + + auto result = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "config", + createCmd, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + if (!result.isOK()) { + return result.getStatus(); + } + + if (!result.getValue().commandStatus.isOK()) { + if (result.getValue().commandStatus == ErrorCodes::NamespaceExists) { + if (result.getValue().writeConcernStatus.isOK()) { + return Status::OK(); + } else { + return result.getValue().writeConcernStatus; + } + } else { + return result.getValue().commandStatus; + } + } + + return result.getValue().writeConcernStatus; +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_logging.h b/src/mongo/db/s/sharding_logging.h new file mode 100644 index 00000000000..3407809d9a4 --- /dev/null +++ b/src/mongo/db/s/sharding_logging.h @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" + +namespace mongo { + +/** + * Diagnostic logging of sharding metadata events (changelog and actionlog). + */ +class ShardingLogging { + +public: + ShardingLogging(); + ~ShardingLogging(); + + /** + * Retrieves the ShardingLogging instance associated with the current service/operation context. + */ + static ShardingLogging* get(ServiceContext* serviceContext); + static ShardingLogging* get(OperationContext* operationContext); + + Status logAction(OperationContext* opCtx, + const StringData what, + const StringData ns, + const BSONObj& detail); + + Status logChangeChecked(OperationContext* opCtx, + const StringData what, + const StringData ns, + const BSONObj& detail, + const WriteConcernOptions& writeConcern); + + void logChange(OperationContext* const opCtx, + const StringData what, + const StringData ns, + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { + // It is safe to ignore the results of `logChangeChecked` in many cases, as the + // failure to log a change is often of no consequence. + logChangeChecked(opCtx, what, ns, detail, writeConcern).ignore(); + } + +private: + /** + * Creates the specified collection name in the config database. + */ + Status _createCappedConfigCollection(OperationContext* opCtx, + StringData collName, + int cappedSize, + const WriteConcernOptions& writeConcern); + + /** + * Best effort method, which logs diagnostic events on the config server. If the config server + * write fails for any reason a warning will be written to the local service log and the method + * will return a failed status. + * + * @param opCtx Operation context in which the call is running + * @param logCollName Which config collection to write to (excluding the database name) + * @param what E.g. "split", "migrate" (not interpreted) + * @param operationNS To which collection the metadata change is being applied (not interpreted) + * @param detail Additional info about the metadata change (not interpreted) + * @param writeConcern Write concern options to use for logging + */ + Status _log(OperationContext* opCtx, + const StringData logCollName, + const StringData what, + const StringData operationNSS, + const BSONObj& detail, + const WriteConcernOptions& writeConcern); + + // Member variable properties: + // (S) Self-synchronizing; access in any way from any context. + + // Whether the logAction call should attempt to create the actionlog collection + AtomicInt32 _actionLogCollectionCreated{0}; // (S) + + // Whether the logChange call should attempt to create the changelog collection + AtomicInt32 _changeLogCollectionCreated{0}; // (S) +}; + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_logging_test.cpp b/src/mongo/db/s/sharding_logging_test.cpp new file mode 100644 index 00000000000..5504d76c0d9 --- /dev/null +++ b/src/mongo/db/s/sharding_logging_test.cpp @@ -0,0 +1,231 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/commands.h" +#include "mongo/db/s/sharding_logging.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/stdx/chrono.h" +#include "mongo/stdx/future.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/text.h" + +namespace mongo { +namespace { + +using executor::NetworkInterfaceMock; +using executor::TaskExecutor; +using stdx::async; +using unittest::assertGet; + +const Seconds kFutureTimeout{5}; +const HostAndPort configHost{"TestHost1"}; + +class InfoLoggingTest : public ShardingTestFixture { +public: + enum CollType { ActionLog, ChangeLog }; + + InfoLoggingTest(CollType configCollType, int cappedSize) + : _configCollType(configCollType), _cappedSize(cappedSize) {} + + void setUp() override { + ShardingTestFixture::setUp(); + + configTargeter()->setFindHostReturnValue(configHost); + } + +protected: + void noRetryAfterSuccessfulCreate() { + auto future = launchAsync([this] { + log("moved a chunk", "foo.bar", BSON("min" << 3 << "max" << 4)).transitional_ignore(); + }); + + expectConfigCollectionCreate(configHost, getConfigCollName(), _cappedSize, BSON("ok" << 1)); + expectConfigCollectionInsert(configHost, + getConfigCollName(), + network()->now(), + "moved a chunk", + "foo.bar", + BSON("min" << 3 << "max" << 4)); + + // Now wait for the logChange call to return + future.timed_get(kFutureTimeout); + + // Now log another change and confirm that we don't re-attempt to create the collection + future = launchAsync([this] { + log("moved a second chunk", "foo.bar", BSON("min" << 4 << "max" << 5)) + .transitional_ignore(); + }); + + expectConfigCollectionInsert(configHost, + getConfigCollName(), + network()->now(), + "moved a second chunk", + "foo.bar", + BSON("min" << 4 << "max" << 5)); + + // Now wait for the logChange call to return + future.timed_get(kFutureTimeout); + } + + void noRetryCreateIfAlreadyExists() { + auto future = launchAsync([this] { + log("moved a chunk", "foo.bar", BSON("min" << 3 << "max" << 4)).transitional_ignore(); + }); + + BSONObjBuilder createResponseBuilder; + CommandHelpers::appendCommandStatusNoThrow( + createResponseBuilder, Status(ErrorCodes::NamespaceExists, "coll already exists")); + expectConfigCollectionCreate( + configHost, getConfigCollName(), _cappedSize, createResponseBuilder.obj()); + expectConfigCollectionInsert(configHost, + getConfigCollName(), + network()->now(), + "moved a chunk", + "foo.bar", + BSON("min" << 3 << "max" << 4)); + + // Now wait for the logAction call to return + future.timed_get(kFutureTimeout); + + // Now log another change and confirm that we don't re-attempt to create the collection + future = launchAsync([this] { + log("moved a second chunk", "foo.bar", BSON("min" << 4 << "max" << 5)) + .transitional_ignore(); + }); + + expectConfigCollectionInsert(configHost, + getConfigCollName(), + network()->now(), + "moved a second chunk", + "foo.bar", + BSON("min" << 4 << "max" << 5)); + + // Now wait for the logChange call to return + future.timed_get(kFutureTimeout); + } + + void createFailure() { + auto future = launchAsync([this] { + log("moved a chunk", "foo.bar", BSON("min" << 3 << "max" << 4)).transitional_ignore(); + }); + + BSONObjBuilder createResponseBuilder; + CommandHelpers::appendCommandStatusNoThrow( + createResponseBuilder, Status(ErrorCodes::ExceededTimeLimit, "operation timed out")); + expectConfigCollectionCreate( + configHost, getConfigCollName(), _cappedSize, createResponseBuilder.obj()); + + // Now wait for the logAction call to return + future.timed_get(kFutureTimeout); + + // Now log another change and confirm that we *do* attempt to create the collection + future = launchAsync([this] { + log("moved a second chunk", "foo.bar", BSON("min" << 4 << "max" << 5)) + .transitional_ignore(); + }); + + expectConfigCollectionCreate(configHost, getConfigCollName(), _cappedSize, BSON("ok" << 1)); + expectConfigCollectionInsert(configHost, + getConfigCollName(), + network()->now(), + "moved a second chunk", + "foo.bar", + BSON("min" << 4 << "max" << 5)); + + // Now wait for the logChange call to return + future.timed_get(kFutureTimeout); + } + + std::string getConfigCollName() const { + return (_configCollType == ChangeLog ? "changelog" : "actionlog"); + } + + Status log(const std::string& what, const std::string& ns, const BSONObj& detail) { + if (_configCollType == ChangeLog) { + return ShardingLogging::get(operationContext()) + ->logChangeChecked(operationContext(), + what, + ns, + detail, + ShardingCatalogClient::kMajorityWriteConcern); + } else { + return ShardingLogging::get(operationContext()) + ->logAction(operationContext(), what, ns, detail); + } + } + + const CollType _configCollType; + const int _cappedSize; +}; + +class ActionLogTest : public InfoLoggingTest { +public: + ActionLogTest() : InfoLoggingTest(ActionLog, 20 * 1024 * 1024) {} +}; + +class ChangeLogTest : public InfoLoggingTest { +public: + ChangeLogTest() : InfoLoggingTest(ChangeLog, 200 * 1024 * 1024) {} +}; + +TEST_F(ActionLogTest, NoRetryAfterSuccessfulCreate) { + noRetryAfterSuccessfulCreate(); +} +TEST_F(ChangeLogTest, NoRetryAfterSuccessfulCreate) { + noRetryAfterSuccessfulCreate(); +} + +TEST_F(ActionLogTest, NoRetryCreateIfAlreadyExists) { + noRetryCreateIfAlreadyExists(); +} +TEST_F(ChangeLogTest, NoRetryCreateIfAlreadyExists) { + noRetryCreateIfAlreadyExists(); +} + +TEST_F(ActionLogTest, CreateFailure) { + createFailure(); +} +TEST_F(ChangeLogTest, CreateFailure) { + createFailure(); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp index 6fc39417200..1ba71a79f59 100644 --- a/src/mongo/db/s/sharding_state_recovery.cpp +++ b/src/mongo/db/s/sharding_state_recovery.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/server_parameters.h" #include "mongo/db/write_concern.h" @@ -253,12 +254,12 @@ Status ShardingStateRecovery::recover(OperationContext* opCtx) { "to retrieve the most recent opTime."; // Need to fetch the latest uptime from the config server, so do a logging write - Status status = - grid->catalogClient()->logChangeChecked(opCtx, - "Sharding minOpTime recovery", - NamespaceString::kServerConfigurationNamespace.ns(), - recoveryDocBSON, - ShardingCatalogClient::kMajorityWriteConcern); + Status status = ShardingLogging::get(opCtx)->logChangeChecked( + opCtx, + "Sharding minOpTime recovery", + NamespaceString::kServerConfigurationNamespace.ns(), + recoveryDocBSON, + ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) return status; diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index 56e9d516385..c948e0b4eff 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -49,6 +49,7 @@ #include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" +#include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_state.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer_configuration.h" @@ -408,7 +409,6 @@ void shardCollection(OperationContext* opCtx, const bool fromMapReduce, const ShardId& dbPrimaryShardId, const int numContiguousChunksPerShard) { - const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); @@ -428,12 +428,12 @@ void shardCollection(OperationContext* opCtx, } collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast(splitPoints.size() + 1)); - uassertStatusOK( - catalogClient->logChangeChecked(opCtx, - "shardCollection.start", - nss.ns(), - collectionDetail.obj(), - ShardingCatalogClient::kMajorityWriteConcern)); + uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( + opCtx, + "shardCollection.start", + nss.ns(), + collectionDetail.obj(), + ShardingCatalogClient::kMajorityWriteConcern)); } // Construct the collection default collator. @@ -538,11 +538,12 @@ void shardCollection(OperationContext* opCtx, shardsRefreshed.emplace_back(chunk.getShard()); } - catalogClient->logChange(opCtx, - "shardCollection.end", - nss.ns(), - BSON("version" << initialChunks.collVersion().toString()), - ShardingCatalogClient::kMajorityWriteConcern); + ShardingLogging::get(opCtx)->logChange( + opCtx, + "shardCollection.end", + nss.ns(), + BSON("version" << initialChunks.collVersion().toString()), + ShardingCatalogClient::kMajorityWriteConcern); } /** -- cgit v1.2.1