summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-05-16 11:46:25 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-07-03 05:56:59 -0400
commit059b8a9dc777e3940caa26f2a909d4988b605645 (patch)
tree709ccc99b8094354489a55f4a19bf6e88cb65a92
parentde8b171fbd730be5f82d0c4b6a3c378000167246 (diff)
downloadmongo-059b8a9dc777e3940caa26f2a909d4988b605645.tar.gz
SERVER-41869 On term mismatch do not invoke the getChunkSince callback under mutex
As part of this change also backports the following cleanup: - Use alias for the callback of CatalogCacheLoader::getChunksSince - Use StringMap in CollectionShardingState instead of std::unordered_map (cherry picked from commit cb0393248d26e21e69efde15d9d3965293ead29b)
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp5
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.h3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp1
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.cpp4
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.h3
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp65
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h31
-rw-r--r--src/mongo/s/catalog_cache_loader.h9
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp7
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.h3
10 files changed, 52 insertions, 79 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index 8b6e06723d5..d798d1b137a 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -97,10 +97,7 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin
}
std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
-
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept {
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h
index f6b7b5e2115..76a0fc17644 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.h
+++ b/src/mongo/db/s/catalog_cache_loader_mock.h
@@ -59,8 +59,7 @@ public:
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 85cd60486d3..7ba81f81709 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
+#include "mongo/util/string_map.h"
namespace mongo {
namespace {
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
index e730c055694..f7b0996bc7a 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
@@ -46,9 +46,7 @@ void ReadOnlyCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx, S
}
std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
return _configServerLoader.getChunksSince(nss, version, callbackFn);
}
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h
index 49bff910bda..43c8aaf1037 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.h
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.h
@@ -51,8 +51,7 @@ public:
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 880ae109ddf..24f42034580 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -360,7 +360,7 @@ void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const Namesp
}
void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_role == ReplicaSetRole::None);
if (isPrimary) {
@@ -371,7 +371,7 @@ void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
}
void ShardServerCatalogCacheLoader::onStepDown() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_role != ReplicaSetRole::None);
_contexts.interrupt(ErrorCodes::PrimarySteppedDown);
++_term;
@@ -379,58 +379,47 @@ void ShardServerCatalogCacheLoader::onStepDown() {
}
void ShardServerCatalogCacheLoader::onStepUp() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_role != ReplicaSetRole::None);
++_term;
_role = ReplicaSetRole::Primary;
}
std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
- long long currentTerm;
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
+ auto notify = std::make_shared<Notification<void>>();
+
bool isPrimary;
- {
- // Take the mutex so that we can discern whether we're primary or secondary and schedule a
- // task with the corresponding _term value.
+ long long term;
+ std::tie(isPrimary, term) = [&] {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_role != ReplicaSetRole::None);
-
- currentTerm = _term;
- isPrimary = (_role == ReplicaSetRole::Primary);
- }
-
- auto notify = std::make_shared<Notification<void>>();
+ return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
+ }();
uassertStatusOK(_threadPool.schedule(
- [ this, nss, version, callbackFn, notify, isPrimary, currentTerm ]() noexcept {
+ [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept {
auto context = _contexts.makeOperationContext(*Client::getCurrent());
+ auto const opCtx = context.opCtx();
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- // We may have missed an OperationContextGroup interrupt since this operation began
- // but before the OperationContext was added to the group. So we'll check that
- // we're still in the same _term.
- if (_term != currentTerm) {
- callbackFn(context.opCtx(),
- Status{ErrorCodes::Interrupted,
- "Unable to refresh routing table because replica set state "
- "changed or node is shutting down."});
- notify->set();
- return;
+ try {
+ {
+ // We may have missed an OperationContextGroup interrupt since this operation
+ // began but before the OperationContext was added to the group. So we'll check
+ // that we're still in the same _term.
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ uassert(ErrorCodes::Interrupted,
+ "Unable to refresh routing table because replica set state changed or "
+ "the node is shutting down.",
+ _term == term);
}
- }
- try {
if (isPrimary) {
- _schedulePrimaryGetChunksSince(
- context.opCtx(), nss, version, currentTerm, callbackFn, notify);
+ _schedulePrimaryGetChunksSince(opCtx, nss, version, term, callbackFn, notify);
} else {
- _runSecondaryGetChunksSince(context.opCtx(), nss, version, callbackFn);
+ _runSecondaryGetChunksSince(opCtx, nss, version, callbackFn, notify);
}
} catch (const DBException& ex) {
- callbackFn(context.opCtx(), ex.toStatus());
+ callbackFn(opCtx, ex.toStatus());
notify->set();
}
}));
@@ -603,13 +592,15 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
+ std::shared_ptr<Notification<void>> notify) {
forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss);
// Read the local metadata.
auto swCollAndChunks =
_getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion);
callbackFn(opCtx, std::move(swCollAndChunks));
+ notify->set();
}
void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index ff246c0666e..045e9e0c1aa 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -75,21 +75,10 @@ public:
*/
void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
- /**
- * This must be called serially, never in parallel, including waiting for the returned
- * Notification to be signalled.
- *
- * This function is robust to unexpected version requests from the CatalogCache. Requesting
- * versions with epoches that do not match anything on the config server will not affect or
- * clear the locally persisted metadata. Requesting versions higher than anything previous
- * requested, or versions lower than already requested, will not mess up the locally persisted
- * metadata, and will return what was requested if it exists.
- */
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,
@@ -351,7 +340,8 @@ private:
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn);
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
+ std::shared_ptr<Notification<void>> notify);
/**
* Refreshes chunk metadata from the config server's metadata store, and schedules maintenance
@@ -479,21 +469,19 @@ private:
CollectionAndChangedChunks _getCompletePersistedMetadataForSecondarySinceVersion(
OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version);
- // Used by the shard primary to retrieve chunk metadata from the config server.
+ // Loader used by the shard primary to retrieve the authoritative routing metadata from the
+ // config server
const std::unique_ptr<CatalogCacheLoader> _configServerLoader;
- // Thread pool used to load chunk metadata.
+ // Thread pool used to run blocking tasks which perform disk reads and writes
ThreadPool _threadPool;
+ // Registry of notifications for changes happening to the shard's on-disk routing information
NamespaceMetadataChangeNotifications _namespaceNotifications;
- // Protects the class state below.
+ // Protects the class state below
stdx::mutex _mutex;
- // Map to track in progress persisted cache updates on the shard primary.
- CollAndChunkTaskLists _collAndChunkTaskLists;
- DbTaskLists _dbTaskLists;
-
// This value is bumped every time the set of currently scheduled tasks should no longer be
// running. This includes, replica set state transitions and shutdown.
long long _term{0};
@@ -504,6 +492,9 @@ private:
// The collection of operation contexts in use by all threads.
OperationContextGroup _contexts;
+
+ CollAndChunkTaskLists _collAndChunkTaskLists;
+ DbTaskLists _dbTaskLists;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h
index 94d8ea5a93e..ef292552df1 100644
--- a/src/mongo/s/catalog_cache_loader.h
+++ b/src/mongo/s/catalog_cache_loader.h
@@ -68,7 +68,6 @@ public:
static CatalogCacheLoader& get(ServiceContext* serviceContext);
static CatalogCacheLoader& get(OperationContext* opCtx);
-
/**
* Used as a return value for getChunksSince.
*/
@@ -93,6 +92,9 @@ public:
std::vector<ChunkType> changedChunks;
};
+ using GetChunksSinceCallbackFn =
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>;
+
/**
* Initializes internal state. Must be called only once when sharding state is initialized.
*/
@@ -126,10 +128,7 @@ public:
* Notification object can be waited on in order to ensure that.
*/
virtual std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>
- callbackFn) = 0;
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) = 0;
/**
* Non-blocking call, which requests the most recent db version for the given dbName from the
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 4de200959e9..97b5ee9ca9d 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -27,7 +27,9 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
@@ -172,10 +174,7 @@ void ConfigServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCt
}
std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
-
+ const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
auto notify = std::make_shared<Notification<void>>();
uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept {
diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h
index 50d771d1681..fd7d104d80a 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.h
+++ b/src/mongo/s/config_server_catalog_cache_loader.h
@@ -53,8 +53,7 @@ public:
std::shared_ptr<Notification<void>> getChunksSince(
const NamespaceString& nss,
ChunkVersion version,
- stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
- override;
+ GetChunksSinceCallbackFn callbackFn) override;
void getDatabase(
StringData dbName,