summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2019-04-09 18:08:53 -0400
committerBlake Oler <blake.oler@mongodb.com>2019-04-16 15:57:30 -0400
commit29ef1a415c74c883746325f13a8eaaa1831f8102 (patch)
tree13ce885258c58747b604a226215bdaca535361a5 /src
parente984f9781d2947e3b1fc10ae8535d630c49b5e94 (diff)
downloadmongo-29ef1a415c74c883746325f13a8eaaa1831f8102.tar.gz
SERVER-40346 Use AlternativeSessionRegion to insert config documents as retryable write
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h6
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp11
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h10
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp13
-rw-r--r--src/mongo/db/s/shardsvr_shard_collection.cpp18
-rw-r--r--src/mongo/s/catalog/SConscript3
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h13
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp106
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.h5
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h5
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp4
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h5
13 files changed, 179 insertions, 26 deletions
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index 77ccb46eded..f116b79b63e 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -40,7 +40,7 @@ namespace write_ops {
// Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5
// bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1
// byte (zero terminator) = 7 bytes
-constexpr int kBSONArrayPerElementOverheadBytes = 7;
+constexpr int kWriteCommandBSONArrayPerElementOverheadBytes = 7;
/**
* Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi'
@@ -90,10 +90,10 @@ public:
int size = 0;
std::for_each(_pipeline->begin(), _pipeline->end(), [&size](const BSONObj& obj) {
- size += obj.objsize() + kBSONArrayPerElementOverheadBytes;
+ size += obj.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes;
});
- return size + kBSONArrayPerElementOverheadBytes;
+ return size + kWriteCommandBSONArrayPerElementOverheadBytes;
}
Type type() const {
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 61baebf030f..bd8ec905907 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -382,17 +382,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
return initialChunks;
}
-void InitialSplitPolicy::writeFirstChunksToConfig(
- OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks) {
- for (const auto& chunk : initialChunks.chunks) {
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
- opCtx,
- ChunkType::ConfigNS,
- chunk.toConfigBSON(),
- ShardingCatalogClient::kMajorityWriteConcern));
- }
-}
-
boost::optional<CollectionType> InitialSplitPolicy::checkIfCollectionAlreadyShardedWithSameOptions(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index ab44a85216c..feeb2ca2902 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -136,16 +136,10 @@ public:
int numContiguousChunksPerShard = 1);
/**
- * Writes to the config server the first chunks for a newly sharded collection.
- */
- static void writeFirstChunksToConfig(
- OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks);
-
- /**
* Throws an exception if the collection is already sharded with different options.
*
- * If the collection is already sharded with the same options, returns the existing collection's
- * full spec, else returns boost::none.
+ * If the collection is already sharded with the same options, returns the existing
+ * collection's full spec, else returns boost::none.
*/
static boost::optional<CollectionType> checkIfCollectionAlreadyShardedWithSameOptions(
OperationContext* opCtx,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index ddc0b54f29a..b1220cee3fc 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -128,6 +128,17 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx,
return uassertStatusOK(UUID::parse(collectionInfo["uuid"]));
}
+void writeFirstChunksForShardCollection(
+ OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks) {
+ for (const auto& chunk : initialChunks.chunks) {
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
+ opCtx,
+ ChunkType::ConfigNS,
+ chunk.toConfigBSON(),
+ ShardingCatalogClient::kMajorityWriteConcern));
+ }
+}
+
} // namespace
void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) {
@@ -414,7 +425,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
treatAsNoZonesDefined,
treatAsEmpty);
- InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks);
+ writeFirstChunksForShardCollection(opCtx, initialChunks);
{
CollectionType coll;
diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp
index 48b48dc7532..f8d4877b58c 100644
--- a/src/mongo/db/s/shardsvr_shard_collection.cpp
+++ b/src/mongo/db/s/shardsvr_shard_collection.cpp
@@ -415,6 +415,22 @@ void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss)
numChunks == 0);
}
+void writeFirstChunksToConfig(OperationContext* opCtx,
+ const InitialSplitPolicy::ShardCollectionConfig& initialChunks) {
+
+ std::vector<BSONObj> chunkObjs;
+ chunkObjs.reserve(initialChunks.chunks.size());
+ for (const auto& chunk : initialChunks.chunks) {
+ chunkObjs.push_back(chunk.toConfigBSON());
+ }
+
+ Grid::get(opCtx)->catalogClient()->insertConfigDocumentsAsRetryableWrite(
+ opCtx,
+ ChunkType::ConfigNS,
+ std::move(chunkObjs),
+ ShardingCatalogClient::kMajorityWriteConcern);
+}
+
void shardCollection(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID> uuid,
@@ -522,7 +538,7 @@ void shardCollection(OperationContext* opCtx,
}
// Insert chunk documents to config.chunks on the config server.
- InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks);
+ writeFirstChunksToConfig(opCtx, initialChunks);
{
CollectionType coll;
diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript
index 72d095847c9..f4618bcd300 100644
--- a/src/mongo/s/catalog/SConscript
+++ b/src/mongo/s/catalog/SConscript
@@ -84,6 +84,9 @@ env.Library(
'dist_lock_manager',
'sharding_catalog_client',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/logical_session_id_helpers'
+ ]
)
env.Library(
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index cb16b5d26f9..febbf82fe06 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -322,6 +322,19 @@ public:
const WriteConcernOptions& writeConcern) = 0;
/**
+ * Directly inserts documents in the specified namespace on the config server. Inserts said
+ * documents using a retryable write. Underneath, a session is created and destroyed -- this
+ * ad-hoc session creation strategy should never be used outside of specific, non-performant
+ * code paths.
+ *
+ * Must only be used for insertions in the 'config' database.
+ */
+ virtual void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ const WriteConcernOptions& writeConcern) = 0;
+
+ /**
* Updates a single document in the specified namespace on the config server. The document must
* have an _id index. Must only be used for updates to the 'config' database.
*
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 063069922b0..b64f17d968c 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -42,6 +42,8 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/logical_session_cache.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
@@ -64,6 +66,7 @@
#include "mongo/s/request_types/set_shard_version_request.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/s/shard_util.h"
+#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/assert_util.h"
@@ -87,12 +90,39 @@ using str::stream;
namespace {
+class AlternativeSessionRegion {
+public:
+ AlternativeSessionRegion(OperationContext* opCtx)
+ : _alternateClient(opCtx->getServiceContext()->makeClient("alternative-session-region")),
+ _acr(_alternateClient),
+ _newOpCtx(cc().makeOperationContext()),
+ _lsid(makeLogicalSessionId(opCtx)) {
+ _newOpCtx->setLogicalSessionId(_lsid);
+ }
+
+ ~AlternativeSessionRegion() {
+ LogicalSessionCache::get(opCtx())->endSessions({_lsid});
+ }
+
+ OperationContext* opCtx() {
+ return &*_newOpCtx;
+ }
+
+private:
+ ServiceContext::UniqueClient _alternateClient;
+ AlternativeClientRegion _acr;
+ ServiceContext::UniqueOperationContext _newOpCtx;
+ LogicalSessionId _lsid;
+};
+
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred,
TagSet{});
const int kMaxReadRetry = 3;
const int kMaxWriteRetry = 3;
+const int kRetryableBatchWriteBSONSizeOverhead = kWriteCommandBSONArrayPerElementOverheadBytes * 2;
+
const NamespaceString kSettingsNamespace("config", "settings");
void toBatchError(const Status& status, BatchedCommandResponse* response) {
@@ -100,6 +130,38 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
response->setStatus(status);
}
+void sendRetryableWriteBatchRequestToConfig(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj>& docs,
+ TxnNumber txnNumber,
+ const WriteConcernOptions& writeConcern) {
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setDocuments(docs);
+ return insertOp;
+ }());
+ request.setWriteConcern(writeConcern.toBSON());
+
+ BSONObj cmdObj = request.toBSON();
+ BSONObjBuilder bob(cmdObj);
+ bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber);
+
+ BatchedCommandResponse batchResponse;
+ auto response = configShard->runCommand(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ nss.db().toString(),
+ bob.obj(),
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
+
+ auto writeStatus = Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
+
+ uassertStatusOK(batchResponse.toStatus());
+ uassertStatusOK(writeStatus);
+}
+
} // namespace
ShardingCatalogClientImpl::ShardingCatalogClientImpl(
@@ -813,6 +875,50 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
+void ShardingCatalogClientImpl::insertConfigDocumentsAsRetryableWrite(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ const WriteConcernOptions& writeConcern) {
+ invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb);
+
+ AlternativeSessionRegion asr(opCtx);
+ TxnNumber currentTxnNumber = 0;
+
+ std::vector<BSONObj> workingBatch;
+ size_t workingBatchItemSize = 0;
+
+ int workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead;
+
+ while (!docs.empty()) {
+ BSONObj toAdd = docs.back();
+ docs.pop_back();
+
+ int docSize = toAdd.objsize();
+ bool batchAtSizeLimit = (workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) ||
+ (workingBatchDocSize + docSize > BSONObjMaxUserSize);
+
+ if (batchAtSizeLimit) {
+ sendRetryableWriteBatchRequestToConfig(
+ asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern);
+ ++currentTxnNumber;
+
+ workingBatch.clear();
+ workingBatchItemSize = 0;
+ workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead;
+ }
+
+ workingBatch.push_back(toAdd);
+ ++workingBatchItemSize;
+ workingBatchDocSize += docSize;
+ }
+
+ if (!workingBatch.empty()) {
+ sendRetryableWriteBatchRequestToConfig(
+ asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern);
+ }
+}
+
StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h
index 3af675402fd..ab4e9506594 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h
@@ -143,6 +143,11 @@ public:
const BSONObj& doc,
const WriteConcernOptions& writeConcern) override;
+ void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ const WriteConcernOptions& writeConcern) override;
+
StatusWith<bool> updateConfigDocument(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 3c4bca683ce..abb2cd48a25 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -163,6 +163,12 @@ Status ShardingCatalogClientMock::insertConfigDocument(OperationContext* opCtx,
return {ErrorCodes::InternalError, "Method not implemented"};
}
+void ShardingCatalogClientMock::insertConfigDocumentsAsRetryableWrite(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ const WriteConcernOptions& writeConcern) {}
+
StatusWith<bool> ShardingCatalogClientMock::updateConfigDocument(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 5af200acd7e..f8c84300cc1 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -116,6 +116,11 @@ public:
const BSONObj& doc,
const WriteConcernOptions& writeConcern) override;
+ void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ const WriteConcernOptions& writeConcern) override;
+
StatusWith<bool> updateConfigDocument(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index ae2a1832c40..dc5da6c1694 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -369,8 +369,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
// Account the array overhead once for the actual updates array and once for the statement
// ids array, if retryable writes are used
const int writeSizeBytes = getWriteSizeBytes(writeOp) +
- write_ops::kBSONArrayPerElementOverheadBytes +
- (_batchTxnNum ? write_ops::kBSONArrayPerElementOverheadBytes + 4 : 0);
+ write_ops::kWriteCommandBSONArrayPerElementOverheadBytes +
+ (_batchTxnNum ? write_ops::kWriteCommandBSONArrayPerElementOverheadBytes + 4 : 0);
if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchMap)) {
invariant(!batchMap.empty());
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index b01814aa138..3c946f0f4fe 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -50,6 +50,11 @@ class OperationContext;
class TargetedWriteBatch;
class TrackedErrors;
+// Conservative overhead per element contained in the write batch. This value was calculated as 1
+// byte (element type) + 5 bytes (max string encoding of the array index encoded as string and the
+// maximum key is 99999) + 1 byte (zero terminator) = 7 bytes
+const int kWriteCommandBSONArrayPerElementOverheadBytes = 7;
+
/**
* Simple struct for storing an error with an endpoint.
*