summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2016-05-09 18:59:26 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2016-05-09 19:05:17 -0400
commit78c38583efe92db429ed0cb611dd0919f2d890ea (patch)
treea701de0a5ce77c938a1081c9baba0c4c3b09e1b8 /src/mongo/s/write_ops
parent2ebae874958646e2359ebf72a0718c861ee05726 (diff)
downloadmongo-78c38583efe92db429ed0cb611dd0919f2d890ea.tar.gz
Revert "SERVER-23336 replace ShardResolver with RemoteCommandTargeter::findHost"
This reverts commit d1957fd1f86510c37a893ec3c51140cf004407d5.
Diffstat (limited to 'src/mongo/s/write_ops')
-rw-r--r--src/mongo/s/write_ops/SConscript7
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp48
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h6
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp139
4 files changed, 88 insertions, 112 deletions
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index 1e0091f9d3f..92e3427e873 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -34,8 +34,6 @@ env.Library(
LIBDEPS=[
'batch_write_types',
'$BUILD_DIR/mongo/client/connection_string',
- '$BUILD_DIR/mongo/s/client/sharding_client',
- '$BUILD_DIR/mongo/s/coreshard',
],
)
@@ -77,7 +75,6 @@ env.CppUnitTest(
'cluster_write_op',
'$BUILD_DIR/mongo/db/range_arithmetic',
'$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/s/sharding_test_fixture',
]
)
@@ -90,7 +87,5 @@ env.CppUnitTest(
LIBDEPS=[
'cluster_write_op',
'cluster_write_op_conversion',
- '$BUILD_DIR/mongo/s/sharding_test_fixture',
- '$BUILD_DIR/mongo/s/mongoscore',
- ],
+ ]
)
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index d803fbaf32b..e056e05609c 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -37,10 +37,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/util/builder.h"
#include "mongo/client/connection_string.h"
-#include "mongo/client/remote_command_targeter.h"
#include "mongo/s/client/multi_command_dispatch.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/write_error_detail.h"
#include "mongo/util/log.h"
@@ -51,8 +48,10 @@ using std::make_pair;
using std::stringstream;
using std::vector;
-BatchWriteExec::BatchWriteExec(NSTargeter* targeter, MultiCommandDispatch* dispatcher)
- : _targeter(targeter), _dispatcher(dispatcher) {}
+BatchWriteExec::BatchWriteExec(NSTargeter* targeter,
+ ShardResolver* resolver,
+ MultiCommandDispatch* dispatcher)
+ : _targeter(targeter), _resolver(resolver), _dispatcher(dispatcher) {}
namespace {
@@ -170,43 +169,22 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
continue;
// Figure out what host we need to dispatch our targeted batch
- bool resolvedHost = true;
- const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
- auto shard =
- grid.shardRegistry()->getShard(txn, nextBatch->getEndpoint().shardName);
-
- if (!shard) {
- Status status = Status(ErrorCodes::ShardNotFound,
- str::stream() << "unknown shard name "
- << nextBatch->getEndpoint().shardName);
- resolvedHost = false;
+ ConnectionString shardHost;
+ Status resolveStatus =
+ _resolver->chooseWriteHost(txn, nextBatch->getEndpoint().shardName, &shardHost);
+ if (!resolveStatus.isOK()) {
+ ++stats->numResolveErrors;
// Record a resolve failure
// TODO: It may be necessary to refresh the cache if stale, or maybe just
// cancel and retarget the batch
WriteErrorDetail error;
- buildErrorFrom(status, &error);
- LOG(4) << "unable to send write batch to " << nextBatch->getEndpoint().shardName
- << causedBy(status);
- batchOp.noteBatchError(*nextBatch, error);
- }
+ buildErrorFrom(resolveStatus, &error);
- auto swHostAndPort = shard->getTargeter()->findHost(readPref);
- if (!swHostAndPort.isOK()) {
- resolvedHost = false;
+ LOG(4) << "unable to send write batch to " << shardHost.toString()
+ << causedBy(resolveStatus.toString());
- // Record a resolve failure
- // TODO: It may be necessary to refresh the cache if stale, or maybe just
- // cancel and retarget the batch
- WriteErrorDetail error;
- buildErrorFrom(swHostAndPort.getStatus(), &error);
- LOG(4) << "unable to send write batch to " << nextBatch->getEndpoint().shardName
- << causedBy(swHostAndPort.getStatus());
batchOp.noteBatchError(*nextBatch, error);
- }
-
- if (!resolvedHost) {
- ++stats->numResolveErrors;
// We're done with this batch
// Clean up when we can't resolve a host
@@ -216,8 +194,6 @@ void BatchWriteExec::executeBatch(OperationContext* txn,
continue;
}
- ConnectionString shardHost(swHostAndPort.getValue());
-
// If we already have a batch for this host, wait until the next time
OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find(shardHost);
if (pendingIt != pendingBatches.end())
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index 739e16a046d..e29a80a9a72 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -36,6 +36,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/optime.h"
#include "mongo/s/ns_targeter.h"
+#include "mongo/s/shard_resolver.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -64,7 +65,7 @@ class BatchWriteExec {
MONGO_DISALLOW_COPYING(BatchWriteExec);
public:
- BatchWriteExec(NSTargeter* targeter, MultiCommandDispatch* dispatcher);
+ BatchWriteExec(NSTargeter* targeter, ShardResolver* resolver, MultiCommandDispatch* dispatcher);
/**
* Executes a client batch write request by sending child batches to several shard
@@ -82,6 +83,9 @@ private:
NSTargeter* _targeter;
// Not owned here
+ ShardResolver* _resolver;
+
+ // Not owned here
MultiCommandDispatch* _dispatcher;
};
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 74b84e4503c..bf0e4c8685a 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -28,19 +28,14 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/write_ops/batch_write_exec.h"
-
#include "mongo/base/owned_pointer_vector.h"
-#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/client/remote_command_targeter_factory_mock.h"
-#include "mongo/s/catalog/type_shard.h"
+#include "mongo/db/operation_context_noop.h"
#include "mongo/s/client/mock_multi_write_command.h"
-#include "mongo/s/client/shard_registry.h"
#include "mongo/s/mock_ns_targeter.h"
-#include "mongo/s/sharding_test_fixture.h"
+#include "mongo/s/mock_shard_resolver.h"
+#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -51,60 +46,35 @@ using std::vector;
namespace {
-const HostAndPort kTestShardHost = HostAndPort("FakeHost", 12345);
-const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
-const string shardName = "FakeShard";
-
/**
* Mimics a single shard backend for a particular collection which can be initialized with a
* set of write command results to return.
*/
-class BatchWriteExecTest : public ShardingTestFixture {
+class MockSingleShardBackend {
public:
- BatchWriteExecTest() = default;
- ~BatchWriteExecTest() = default;
-
- void setUp() override {
- ShardingTestFixture::setUp();
- getMessagingPort()->setRemote(HostAndPort("ClientHost", 12345));
-
- // Set up the RemoteCommandTargeter for the config shard.
- configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
-
- // Add a RemoteCommandTargeter for the data shard.
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
- stdx::make_unique<RemoteCommandTargeterMock>());
- targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost));
- targeter->setFindHostReturnValue(kTestShardHost);
- targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost),
- std::move(targeter));
-
- // Set up the shard registry to contain the fake shard.
- ShardType shardType;
- shardType.setName(shardName);
- shardType.setHost(kTestShardHost.toString());
- std::vector<ShardType> shards{shardType};
- setupShards(shards);
-
- // Set up the namespace targeter to target the fake shard.
- ShardEndpoint endpoint(shardName, ChunkVersion::IGNORED());
+ MockSingleShardBackend(OperationContext* txn, const NamespaceString& nss) {
+ // Initialize targeting to a mock shard
+ ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
vector<MockRange*> mockRanges;
mockRanges.push_back(
new MockRange(endpoint, nss, BSON("x" << MINKEY), BSON("x" << MAXKEY)));
- nsTargeter.init(mockRanges);
+ targeter.init(mockRanges);
+
+ // Get the connection string for the mock shard
+ resolver.chooseWriteHost(txn, mockRanges.front()->endpoint.shardName, &shardHost);
- // Make the batch write executor use the mock backend.
- exec.reset(new BatchWriteExec(&nsTargeter, &dispatcher));
+ // Executor using the mock backend
+ exec.reset(new BatchWriteExec(&targeter, &resolver, &dispatcher));
}
void setMockResults(const vector<MockWriteResult*>& results) {
dispatcher.init(results);
}
- ConnectionString shardHost{kTestShardHost};
- NamespaceString nss{"foo.bar"};
+ ConnectionString shardHost;
- MockNSTargeter nsTargeter;
+ MockNSTargeter targeter;
+ MockShardResolver resolver;
MockMultiWriteCommand dispatcher;
unique_ptr<BatchWriteExec> exec;
@@ -114,11 +84,16 @@ public:
// Tests for the BatchWriteExec
//
-TEST_F(BatchWriteExecTest, SingleOp) {
+TEST(BatchWriteExecTests, SingleOp) {
//
// Basic execution test
//
+ OperationContextNoop txn;
+ NamespaceString nss("foo.bar");
+
+ MockSingleShardBackend backend(&txn, nss);
+
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
request.setOrdered(false);
@@ -128,25 +103,30 @@ TEST_F(BatchWriteExecTest, SingleOp) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
+ backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(stats.numRounds, 1);
}
-TEST_F(BatchWriteExecTest, SingleOpError) {
+TEST(BatchWriteExecTests, SingleOpError) {
//
// Basic error test
//
+ OperationContextNoop txn;
+ NamespaceString nss("foo.bar");
+
+ MockSingleShardBackend backend(&txn, nss);
+
vector<MockWriteResult*> mockResults;
BatchedCommandResponse errResponse;
errResponse.setOk(false);
errResponse.setErrCode(ErrorCodes::UnknownError);
errResponse.setErrMessage("mock error");
- mockResults.push_back(new MockWriteResult(shardHost, errResponse));
+ mockResults.push_back(new MockWriteResult(backend.shardHost, errResponse));
- setMockResults(mockResults);
+ backend.setMockResults(mockResults);
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
@@ -157,7 +137,7 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
+ backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(response.getN(), 0);
ASSERT(response.isErrDetailsSet());
@@ -172,11 +152,14 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
// Test retryable errors
//
-TEST_F(BatchWriteExecTest, StaleOp) {
+TEST(BatchWriteExecTests, StaleOp) {
//
// Retry op in exec b/c of stale config
//
+ OperationContextNoop txn;
+ NamespaceString nss("foo.bar");
+
// Insert request
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
@@ -185,28 +168,33 @@ TEST_F(BatchWriteExecTest, StaleOp) {
// Do single-target, single doc batch write op
request.getInsertRequest()->addToDocuments(BSON("x" << 1));
+ MockSingleShardBackend backend(&txn, nss);
+
vector<MockWriteResult*> mockResults;
WriteErrorDetail error;
error.setErrCode(ErrorCodes::StaleShardVersion);
error.setErrMessage("mock stale error");
- mockResults.push_back(new MockWriteResult(shardHost, error));
+ mockResults.push_back(new MockWriteResult(backend.shardHost, error));
- setMockResults(mockResults);
+ backend.setMockResults(mockResults);
// Execute request
BatchedCommandResponse response;
BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
+ backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(stats.numStaleBatches, 1);
}
-TEST_F(BatchWriteExecTest, MultiStaleOp) {
+TEST(BatchWriteExecTests, MultiStaleOp) {
//
// Retry op in exec multiple times b/c of stale config
//
+ OperationContextNoop txn;
+ NamespaceString nss("foo.bar");
+
// Insert request
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
@@ -215,32 +203,37 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
// Do single-target, single doc batch write op
request.getInsertRequest()->addToDocuments(BSON("x" << 1));
+ MockSingleShardBackend backend(&txn, nss);
+
vector<MockWriteResult*> mockResults;
WriteErrorDetail error;
error.setErrCode(ErrorCodes::StaleShardVersion);
error.setErrMessage("mock stale error");
for (int i = 0; i < 3; i++) {
- mockResults.push_back(new MockWriteResult(shardHost, error));
+ mockResults.push_back(new MockWriteResult(backend.shardHost, error));
}
- setMockResults(mockResults);
+ backend.setMockResults(mockResults);
// Execute request
BatchedCommandResponse response;
BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
+ backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(stats.numStaleBatches, 3);
}
-TEST_F(BatchWriteExecTest, TooManyStaleOp) {
+TEST(BatchWriteExecTests, TooManyStaleOp) {
//
// Retry op in exec too many times (without refresh) b/c of stale config
- // (The mock nsTargeter doesn't report progress on refresh)
+ // (The mock targeter doesn't report progress on refresh)
// We should report a no progress error for everything in the batch
//
+ OperationContextNoop txn;
+ NamespaceString nss("foo.bar");
+
// Insert request
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
@@ -250,20 +243,23 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
request.getInsertRequest()->addToDocuments(BSON("x" << 1));
request.getInsertRequest()->addToDocuments(BSON("x" << 2));
+ MockSingleShardBackend backend(&txn, nss);
+
vector<MockWriteResult*> mockResults;
WriteErrorDetail error;
error.setErrCode(ErrorCodes::StaleShardVersion);
error.setErrMessage("mock stale error");
for (int i = 0; i < 10; i++) {
- mockResults.push_back(new MockWriteResult(shardHost, error, request.sizeWriteOps()));
+ mockResults.push_back(
+ new MockWriteResult(backend.shardHost, error, request.sizeWriteOps()));
}
- setMockResults(mockResults);
+ backend.setMockResults(mockResults);
// Execute request
BatchedCommandResponse response;
BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
+ backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(response.getN(), 0);
ASSERT(response.isErrDetailsSet());
@@ -271,11 +267,14 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade);
}
-TEST_F(BatchWriteExecTest, ManyStaleOpWithMigration) {
+TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
//
// Retry op in exec many times b/c of stale config, but simulate remote migrations occurring
//
+ OperationContextNoop txn;
+ NamespaceString nss("foo.bar");
+
// Insert request
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
@@ -284,20 +283,22 @@ TEST_F(BatchWriteExecTest, ManyStaleOpWithMigration) {
// Do single-target, single doc batch write op
request.getInsertRequest()->addToDocuments(BSON("x" << 1));
+ MockSingleShardBackend backend(&txn, nss);
+
vector<MockWriteResult*> mockResults;
WriteErrorDetail error;
error.setErrCode(ErrorCodes::StaleShardVersion);
error.setErrMessage("mock stale error");
for (int i = 0; i < 10; i++) {
- mockResults.push_back(new MockWriteResult(shardHost, error));
+ mockResults.push_back(new MockWriteResult(backend.shardHost, error));
}
- setMockResults(mockResults);
+ backend.setMockResults(mockResults);
// Execute request
BatchedCommandResponse response;
BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
+ backend.exec->executeBatch(&txn, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(stats.numStaleBatches, 6);