summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-05-16 11:46:25 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-05-24 13:18:27 -0400
commitcb0393248d26e21e69efde15d9d3965293ead29b (patch)
treefe79feb8446c70c438fd6f6c7bd261a4cb9d7612 /src/mongo
parent2a0465739025e561711180939a76a8c366c2f155 (diff)
downloadmongo-cb0393248d26e21e69efde15d9d3965293ead29b.tar.gz
SERVER-34632 Use alias for the callback of CatalogCacheLoader::getChunksSince
Also use StringMap in CollectionShardingState instead of std::unordered_map.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp5
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.h3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp8
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.cpp4
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.h3
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp65
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h31
-rw-r--r--src/mongo/s/catalog_cache_loader.h9
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp7
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.h3
10 files changed, 55 insertions, 83 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index 0a2685742d2..42aeb43775f 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -95,10 +95,7 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin
}
std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
-
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept {
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h
index 6709d841224..e9b3e3101fb 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.h
+++ b/src/mongo/db/s/catalog_cache_loader_mock.h
@@ -57,8 +57,7 @@ public:
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 424ce3ce548..d457d260f09 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -46,6 +46,7 @@
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
+#include "mongo/util/string_map.h"
namespace mongo {
namespace {
@@ -114,9 +115,9 @@ public:
auto it = _collections.find(ns);
if (it == _collections.end()) {
- auto inserted = _collections.emplace(
+ auto inserted = _collections.try_emplace(
ns,
- std::make_unique<CollectionShardingState>(get.owner(this), NamespaceString(ns)));
+ std::make_shared<CollectionShardingState>(get.owner(this), NamespaceString(ns)));
invariant(inserted.second);
it = std::move(inserted.first);
}
@@ -156,8 +157,7 @@ public:
private:
mutable stdx::mutex _mutex;
- using CollectionsMap =
- stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>>;
+ using CollectionsMap = StringMap<std::shared_ptr<CollectionShardingState>>;
CollectionsMap _collections;
};
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
index 4c3c317bf15..9757a731e00 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
@@ -44,9 +44,7 @@ void ReadOnlyCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx, S
}
std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
return _configServerLoader.getChunksSince(nss, version, callbackFn);
}
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h
index 2ff0548aaed..73891e7979a 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.h
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.h
@@ -49,8 +49,7 @@ public:
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index ec7bea979b6..b06ae7565e0 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -381,7 +381,7 @@ void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const Namesp
}
void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_role == ReplicaSetRole::None);
if (isPrimary) {
@@ -392,7 +392,7 @@ void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
}
void ShardServerCatalogCacheLoader::onStepDown() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_role != ReplicaSetRole::None);
_contexts.interrupt(ErrorCodes::PrimarySteppedDown);
++_term;
@@ -400,58 +400,47 @@ void ShardServerCatalogCacheLoader::onStepDown() {
}
void ShardServerCatalogCacheLoader::onStepUp() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_role != ReplicaSetRole::None);
++_term;
_role = ReplicaSetRole::Primary;
}
std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
- long long currentTerm;
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
+ auto notify = std::make_shared<Notification<void>>();
+
bool isPrimary;
- {
- // Take the mutex so that we can discern whether we're primary or secondary and schedule a
- // task with the corresponding _term value.
+ long long term;
+ std::tie(isPrimary, term) = [&] {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_role != ReplicaSetRole::None);
-
- currentTerm = _term;
- isPrimary = (_role == ReplicaSetRole::Primary);
- }
-
- auto notify = std::make_shared<Notification<void>>();
+ return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
+ }();
uassertStatusOK(_threadPool.schedule(
- [ this, nss, version, callbackFn, notify, isPrimary, currentTerm ]() noexcept {
+ [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
+ auto const opCtx = context.opCtx();
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- // We may have missed an OperationContextGroup interrupt since this operation began
- // but before the OperationContext was added to the group. So we'll check that
- // we're still in the same _term.
- if (_term != currentTerm) {
- callbackFn(context.opCtx(),
- Status{ErrorCodes::Interrupted,
- "Unable to refresh routing table because replica set state "
- "changed or node is shutting down."});
- notify->set();
- return;
+ try {
+ {
+ // We may have missed an OperationContextGroup interrupt since this operation
+ // began but before the OperationContext was added to the group. So we'll check
+ // that we're still in the same _term.
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ uassert(ErrorCodes::Interrupted,
+ "Unable to refresh routing table because replica set state changed or "
+ "the node is shutting down.",
+ _term == term);
}
- }
- try {
if (isPrimary) {
- _schedulePrimaryGetChunksSince(
- context.opCtx(), nss, version, currentTerm, callbackFn, notify);
+ _schedulePrimaryGetChunksSince(opCtx, nss, version, term, callbackFn, notify);
} else {
- _runSecondaryGetChunksSince(context.opCtx(), nss, version, callbackFn);
+ _runSecondaryGetChunksSince(opCtx, nss, version, callbackFn, notify);
}
} catch (const DBException& ex) {
- callbackFn(context.opCtx(), ex.toStatus());
+ callbackFn(opCtx, ex.toStatus());
notify->set();
}
}));
@@ -614,13 +603,15 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
+ std::shared_ptr<Notification<void>> notify) {
forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss);
// Read the local metadata.
auto swCollAndChunks =
_getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion);
callbackFn(opCtx, std::move(swCollAndChunks));
+ notify->set();
}
void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index a3e908d85bf..52802a3c311 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -73,21 +73,10 @@ public:
*/
void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
- /**
- * This must be called serially, never in parallel, including waiting for the returned
- * Notification to be signalled.
- *
- * This function is robust to unexpected version requests from the CatalogCache. Requesting
- * versions with epoches that do not match anything on the config server will not affect or
- * clear the locally persisted metadata. Requesting versions higher than anything previous
- * requested, or versions lower than already requested, will not mess up the locally persisted
- * metadata, and will return what was requested if it exists.
- */
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,
@@ -349,7 +338,8 @@ private:
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn);
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
+ std::shared_ptr<Notification<void>> notify);
/**
* Refreshes chunk metadata from the config server's metadata store, and schedules maintenance
@@ -477,21 +467,19 @@ private:
CollectionAndChangedChunks _getCompletePersistedMetadataForSecondarySinceVersion(
OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version);
- // Used by the shard primary to retrieve chunk metadata from the config server.
+ // Loader used by the shard primary to retrieve the authoritative routing metadata from the
+ // config server
const std::unique_ptr<CatalogCacheLoader> _configServerLoader;
- // Thread pool used to load chunk metadata.
+ // Thread pool used to run blocking tasks which perform disk reads and writes
ThreadPool _threadPool;
+ // Registry of notifications for changes happening to the shard's on-disk routing information
NamespaceMetadataChangeNotifications _namespaceNotifications;
- // Protects the class state below.
+ // Protects the class state below
stdx::mutex _mutex;
- // Map to track in progress persisted cache updates on the shard primary.
- CollAndChunkTaskLists _collAndChunkTaskLists;
- DbTaskLists _dbTaskLists;
-
// This value is bumped every time the set of currently scheduled tasks should no longer be
// running. This includes, replica set state transitions and shutdown.
long long _term{0};
@@ -502,6 +490,9 @@ private:
// The collection of operation contexts in use by all threads.
OperationContextGroup _contexts;
+
+ CollAndChunkTaskLists _collAndChunkTaskLists;
+ DbTaskLists _dbTaskLists;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h
index 90b2716fdfb..b5317390c1e 100644
--- a/src/mongo/s/catalog_cache_loader.h
+++ b/src/mongo/s/catalog_cache_loader.h
@@ -63,7 +63,6 @@ public:
static CatalogCacheLoader& get(ServiceContext* serviceContext);
static CatalogCacheLoader& get(OperationContext* opCtx);
-
/**
* Used as a return value for getChunksSince.
*/
@@ -88,6 +87,9 @@ public:
std::vector<ChunkType> changedChunks;
};
+ using GetChunksSinceCallbackFn =
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>;
+
/**
* Initializes internal state. Must be called only once when sharding state is initialized.
*/
@@ -121,10 +123,7 @@ public:
* Notification object can be waited on in order to ensure that.
*/
virtual std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>
- callbackFn) = 0;
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) = 0;
/**
* Non-blocking call, which requests the most recent db version for the given dbName from the
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index e6c7ecc138c..d174c7b7e32 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -25,7 +25,9 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
@@ -170,10 +172,7 @@ void ConfigServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCt
}
std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
-
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept {
diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h
index d0ff4e4daba..ad5b8e1b0ab 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.h
+++ b/src/mongo/s/config_server_catalog_cache_loader.h
@@ -51,8 +51,7 @@ public:
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,