summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-06 18:17:34 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-13 15:09:46 -0400
commit965dc76f4b4e27f7a9e3bc7810b608c53085d32f (patch)
treee5554cdfb59e4df76adc55d4851b2aaa54dabc12 /src/mongo/s
parent90bd4ed6ba5d0f3353d1af42c667cd6a2c1a540e (diff)
downloadmongo-965dc76f4b4e27f7a9e3bc7810b608c53085d32f.tar.gz
SERVER-28164 make ClusterWrite::run path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/async_requests_sender.cpp283
-rw-r--r--src/mongo/s/async_requests_sender.h201
-rw-r--r--src/mongo/s/commands/cluster_get_last_error_cmd.cpp10
-rw-r--r--src/mongo/s/commands/cluster_write.cpp4
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp17
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp7
-rw-r--r--src/mongo/s/sharding_test_fixture.h11
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp185
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h6
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp285
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp5
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h2
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp424
13 files changed, 734 insertions, 706 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 1ec65430f75..aa11a3c1216 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -55,12 +55,8 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
StringData db,
const std::vector<AsyncRequestsSender::Request>& requests,
- const ReadPreferenceSetting& readPreference,
- bool allowPartialResults)
- : _executor(executor),
- _db(std::move(db)),
- _readPreference(readPreference),
- _allowPartialResults(allowPartialResults) {
+ const ReadPreferenceSetting& readPreference)
+ : _opCtx(opCtx), _executor(executor), _db(std::move(db)), _readPreference(readPreference) {
for (const auto& request : requests) {
_remotes.emplace_back(request.shardId, request.cmdObj);
}
@@ -73,44 +69,35 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_metadataObj = metadataBuilder.obj();
// Schedule the requests immediately.
- _scheduleRequestsIfNeeded(opCtx);
+ // We must create the notification before scheduling any requests, because the notification is
+ // signaled both on an error in scheduling the request and a request's callback. Similarly, we
+ // lock so that no callbacks signal the notification until after we are done scheduling
+ // requests, to prevent signaling the notification twice.
+ _notification.emplace();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _scheduleRequests_inlock();
}
-
AsyncRequestsSender::~AsyncRequestsSender() {
- invariant(_done());
-}
-
-std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses(
- OperationContext* opCtx) {
- invariant(!_remotes.empty());
-
- // Until all remotes have received a response or error, keep scheduling retries and waiting on
- // outstanding requests.
- while (!_done()) {
- _notification->get();
+ // Make sure any pending network I/O has been canceled.
+ kill();
- // Note: if we have been interrupt()'d or if some remote had a non-retriable error and
- // allowPartialResults is false, no retries will be scheduled.
- _scheduleRequestsIfNeeded(opCtx);
- }
-
- // Construct the responses.
- std::vector<Response> responses;
- for (const auto& remote : _remotes) {
- invariant(remote.swResponse);
- if (remote.swResponse->isOK()) {
- invariant(remote.shardHostAndPort);
- responses.emplace_back(std::move(remote.swResponse->getValue()),
- std::move(*remote.shardHostAndPort));
- } else {
- responses.emplace_back(std::move(remote.swResponse->getStatus()),
- std::move(remote.shardHostAndPort));
- }
+ // Wait on remaining callbacks to run.
+ while (!done()) {
+ next();
}
+}
- _remotes.clear();
+AsyncRequestsSender::Response AsyncRequestsSender::next() {
+ invariant(!done());
- return responses;
+ // If needed, schedule requests for all remotes which had retriable errors.
+ // If some remote had success or a non-retriable error, return it.
+ boost::optional<Response> readyResponse;
+ while (!(readyResponse = _ready())) {
+ // Otherwise, wait for some response to be received.
+ _notification->get(_opCtx);
+ }
+ return *readyResponse;
}
void AsyncRequestsSender::interrupt() {
@@ -120,78 +107,112 @@ void AsyncRequestsSender::interrupt() {
void AsyncRequestsSender::kill() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _stopRetrying = true;
+ if (_killed) {
+ return;
+ }
+ _stopRetrying = true;
// Cancel all outstanding requests so they return immediately.
for (auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
_executor->cancel(remote.cbHandle);
}
}
+ _killed = true;
}
-bool AsyncRequestsSender::_done() {
+bool AsyncRequestsSender::done() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _done_inlock();
+ return std::all_of(
+ _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
}
-bool AsyncRequestsSender::_done_inlock() {
- for (const auto& remote : _remotes) {
- if (!remote.swResponse) {
- return false;
- }
- }
- return true;
-}
-
-/*
- * Note: If _scheduleRequestsIfNeeded() does retries, only the remotes with retriable errors will be
- * rescheduled because:
- *
- * 1. Other pending remotes still have callback assigned to them.
- * 2. Remotes that already successfully received a response will have a non-empty 'response'.
- * 3. Remotes that have reached maximum retries will have an error status.
- */
-void AsyncRequestsSender::_scheduleRequestsIfNeeded(OperationContext* opCtx) {
+boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- // We can't make a new notification if there was a previous one that has not been signaled.
- invariant(!_notification || *_notification);
+ _notification.emplace();
- if (_done_inlock()) {
- return;
+ if (!_stopRetrying) {
+ _scheduleRequests_inlock();
}
- _notification.emplace();
-
- if (_stopRetrying) {
- return;
+ // Check if any remote is ready.
+ invariant(!_remotes.empty());
+ for (auto& remote : _remotes) {
+ if (remote.swResponse && !remote.done) {
+ remote.done = true;
+ if (remote.swResponse->isOK()) {
+ invariant(remote.shardHostAndPort);
+ return Response(std::move(remote.shardId),
+ std::move(remote.swResponse->getValue()),
+ std::move(*remote.shardHostAndPort));
+ } else {
+ return Response(std::move(remote.shardId),
+ std::move(remote.swResponse->getStatus()),
+ std::move(remote.shardHostAndPort));
+ }
+ }
}
+ // No remotes were ready.
+ return boost::none;
+}
+void AsyncRequestsSender::_scheduleRequests_inlock() {
+ invariant(!_stopRetrying);
// Schedule remote work on hosts for which we have not sent a request or need to retry.
for (size_t i = 0; i < _remotes.size(); ++i) {
auto& remote = _remotes[i];
- // If we have not yet received a response or error for this remote, and we do not have an
- // outstanding request for this remote, schedule remote work to send the command.
+ // First check if the remote had a retriable error, and if so, clear its response field so
+ // it will be retried.
+ if (remote.swResponse && !remote.done) {
+ // We check both the response status and command status for a retriable error.
+ Status status = remote.swResponse->getStatus();
+ if (status.isOK()) {
+ status = getStatusFromCommandResult(remote.swResponse->getValue().data);
+ }
+
+ if (!status.isOK()) {
+ // There was an error with either the response or the command.
+ auto shard = remote.getShard();
+ if (!shard) {
+ remote.swResponse =
+ Status(ErrorCodes::ShardNotFound,
+ str::stream() << "Could not find shard " << remote.shardId);
+ } else {
+ if (remote.shardHostAndPort) {
+ shard->updateReplSetMonitor(*remote.shardHostAndPort, status);
+ }
+ if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) &&
+ remote.retryCount < kMaxNumFailedHostRetryAttempts) {
+ LOG(1) << "Command to remote " << remote.shardId << " at host "
+ << *remote.shardHostAndPort
+ << " failed with retriable error and will be retried "
+ << causedBy(redact(status));
+ ++remote.retryCount;
+ remote.swResponse.reset();
+ }
+ }
+ }
+ }
+
+ // If the remote does not have a response or pending request, schedule remote work for it.
if (!remote.swResponse && !remote.cbHandle.isValid()) {
- auto scheduleStatus = _scheduleRequest_inlock(opCtx, i);
+ auto scheduleStatus = _scheduleRequest_inlock(i);
if (!scheduleStatus.isOK()) {
- // Being unable to schedule a request to a remote is a non-retriable error.
remote.swResponse = std::move(scheduleStatus);
-
- // If partial results are not allowed, stop scheduling requests on other remotes and
- // just wait for outstanding requests to come back.
- if (!_allowPartialResults) {
- _stopRetrying = true;
- break;
+ // Signal the notification indicating the remote had an error (we need to do this
+ // because no request was scheduled, so no callback for this remote will run and
+ // signal the notification).
+ if (!*_notification) {
+ _notification->set();
}
}
}
}
}
-Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex) {
+Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -203,15 +224,12 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz
}
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _db.toString(), remote.cmdObj, _metadataObj, opCtx);
-
- auto callbackStatus =
- _executor->scheduleRemoteCommand(request,
- stdx::bind(&AsyncRequestsSender::_handleResponse,
- this,
- stdx::placeholders::_1,
- opCtx,
- remoteIndex));
+ *remote.shardHostAndPort, _db.toString(), remote.cmdObj, _metadataObj, _opCtx);
+
+ auto callbackStatus = _executor->scheduleRemoteCommand(
+ request,
+ stdx::bind(
+ &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex));
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -221,9 +239,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz
}
void AsyncRequestsSender::_handleResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- OperationContext* opCtx,
- size_t remoteIndex) {
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto& remote = _remotes[remoteIndex];
@@ -233,94 +249,37 @@ void AsyncRequestsSender::_handleResponse(
// 'remote'.
remote.cbHandle = executor::TaskExecutor::CallbackHandle();
- // On early return from this point on, signal anyone waiting on the current notification if
- // _done() is true, since this might be the last outstanding request.
- ScopeGuard signaller =
- MakeGuard(&AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock, this);
-
- // We check both the response status and command status for a retriable error.
- Status status = cbData.response.status;
- if (status.isOK()) {
- status = getStatusFromCommandResult(cbData.response.data);
- if (status.isOK()) {
- remote.swResponse = std::move(cbData.response);
- return;
- }
- }
-
- // There was an error with either the response or the command.
-
- auto shard = remote.getShard();
- if (!shard) {
- remote.swResponse =
- Status(ErrorCodes::ShardNotFound,
- str::stream() << "Could not find shard " << remote.shardId << " containing host "
- << remote.getTargetHost().toString());
- return;
- }
- shard->updateReplSetMonitor(remote.getTargetHost(), status);
-
- if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && !_stopRetrying &&
- remote.retryCount < kMaxNumFailedHostRetryAttempts) {
- LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort
- << " failed with retriable error and will be retried" << causedBy(redact(status));
- ++remote.retryCount;
-
- // Even if _done() is not true, signal the thread sleeping in waitForResponses() to make
- // it schedule a retry for this remote without waiting for all outstanding requests to
- // come back.
- signaller.Dismiss();
- _signalCurrentNotification_inlock();
+ // Store the response or error.
+ if (cbData.response.status.isOK()) {
+ remote.swResponse = std::move(cbData.response);
} else {
- // Non-retriable error, out of retries, or _stopRetrying is true.
-
- // Even though we examined the command status to check for retriable errors, we just return
- // the response or response status here. It is up to the caller to parse the response as
- // a command result.
- if (cbData.response.status.isOK()) {
- remote.swResponse = std::move(cbData.response);
- } else {
- remote.swResponse = std::move(cbData.response.status);
- }
-
- // If the caller can't use partial results, there's no point continuing to retry on
- // retriable errors for other remotes.
- if (!_allowPartialResults) {
- _stopRetrying = true;
- }
+ remote.swResponse = std::move(cbData.response.status);
}
-}
-void AsyncRequestsSender::_signalCurrentNotification_inlock() {
- // Only signal the notification if it has not already been signalled.
+ // Signal the notification indicating that the remote received a response.
if (!*_notification) {
_notification->set();
}
}
-void AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock() {
- if (_done_inlock()) {
- _signalCurrentNotification_inlock();
- }
-}
-
AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj)
: shardId(shardId), cmdObj(cmdObj) {}
-AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp)
- : swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {}
+AsyncRequestsSender::Response::Response(ShardId shardId,
+ executor::RemoteCommandResponse response,
+ HostAndPort hp)
+ : shardId(std::move(shardId)),
+ swResponse(std::move(response)),
+ shardHostAndPort(std::move(hp)) {}
-AsyncRequestsSender::Response::Response(Status status, boost::optional<HostAndPort> hp)
- : swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {}
+AsyncRequestsSender::Response::Response(ShardId shardId,
+ Status status,
+ boost::optional<HostAndPort> hp)
+ : shardId(std::move(shardId)), swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {}
AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
: shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
-const HostAndPort& AsyncRequestsSender::RemoteData::getTargetHost() const {
- invariant(shardHostAndPort);
- return *shardHostAndPort;
-}
-
Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
const ReadPreferenceSetting& readPref) {
const auto shard = getShard();
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 9f8664f9c2a..ae90e8eaded 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -48,20 +48,35 @@ namespace mongo {
/**
* The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and
- * automatically retrying on retriable errors according to a RetryPolicy. It can also allow for
- * retrieving partial results by ignoring shards that return errors.
- *
- * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop.
+ * automatically retrying on retriable errors according to a RetryPolicy. Work on remote nodes is
+ * accomplished by scheduling remote work in a TaskExecutor's event loop.
*
* Typical usage is:
*
- * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); // schedule the
- * requests
- * auto responses = ars.waitForResponses(opCtx); // wait for responses; retries on retriable erors
+ * // Add some requests
+ * std::vector<AsyncRequestSender::Request> requests;
+ *
+ * // Creating the ARS schedules the requests immediately
+ * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting);
+ *
+ * while (!ars.done()) {
+ * // Schedule a round of retries if needed and wait for next response or error
+ * auto response = ars.next(opCtx);
*
- * Additionally, you can interrupt() (if you want waitForResponses() to wait for responses for
- * outstanding requests but stop scheduling retries) or kill() (if you want to cancel outstanding
- * requests) the ARS from another thread.
+ * if (!response.swResponse.isOK()) {
+ * // If partial results are tolerable, process the error as needed and continue.
+ * continue;
+ *
+ * // If partial results are not tolerable but you need to retrieve responses for all
+ * // dispatched requests, use stopRetrying() and continue.
+ * ars.stopRetrying();
+ * continue;
+ *
+ * // If partial results are not tolerable and you don't care about dispatched requests,
+ * // safe to destroy the ARS. It will automatically cancel pending I/O and wait for the
+ * // outstanding callbacks to complete on destruction.
+ * }
+ * }
*
* Does not throw exceptions.
*/
@@ -87,10 +102,13 @@ public:
*/
struct Response {
// Constructor for a response that was successfully received.
- Response(executor::RemoteCommandResponse response, HostAndPort hp);
+ Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp);
// Constructor that specifies the reason the response was not successfully received.
- Response(Status status, boost::optional<HostAndPort> hp);
+ Response(ShardId shardId, Status status, boost::optional<HostAndPort> hp);
+
+ // The shard to which the request was sent.
+ ShardId shardId;
// The response or error from the remote.
StatusWith<executor::RemoteCommandResponse> swResponse;
@@ -101,33 +119,38 @@ public:
};
/**
- * Constructs a new AsyncRequestsSender. The TaskExecutor* must remain valid for the lifetime of
- * the ARS.
+ * Constructs a new AsyncRequestsSender. The OperationContext* and TaskExecutor* must remain
+ * valid for the lifetime of the ARS.
*/
AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
StringData db,
const std::vector<AsyncRequestsSender::Request>& requests,
- const ReadPreferenceSetting& readPreference,
- bool allowPartialResults = false);
+ const ReadPreferenceSetting& readPreference);
+ /**
+ * Ensures pending network I/O for any outstanding requests has been canceled and waits for
+ * outstanding requests to complete.
+ */
~AsyncRequestsSender();
/**
- * Returns a vector containing the responses or errors for each remote in the same order as the
- * input vector that was passed in the constructor.
+ * Returns true if responses for all requests have been returned via next().
+ */
+ bool done();
+
+ /**
+ * Returns the next available response or error.
*
- * If we were killed, returns immediately.
- * If we were interrupted, returns when any outstanding requests have completed.
- * Otherwise, returns when each remote has received a response or error.
+ * If neither kill() nor stopRetrying() have been called, schedules retries for any remotes that
+ * have had a retriable error and have not exhausted their retries.
*
- * Must only be called once.
+ * Invalid to call if done() is true.
*/
- std::vector<Response> waitForResponses(OperationContext* opCtx);
+ Response next();
/**
- * Stops the ARS from retrying requests. Causes waitForResponses() to wait until any outstanding
- * requests have received a response or error.
+ * Stops the ARS from retrying requests.
*
* Use this if you no longer care about getting success responses, but need to do cleanup based
* on responses for requests that have already been dispatched.
@@ -135,72 +158,15 @@ public:
void interrupt();
/**
- * Cancels all outstanding requests and makes waitForResponses() return immediately.
+ * Cancels all outstanding requests.
*
* Use this if you no longer care about getting success responses, and don't need to process
- * responses for outstanding requests.
+ * responses for requests that have already been dispatched.
*/
void kill();
private:
/**
- * Returns true if each remote has received a response or error. (If kill() has been called,
- * the error is the error assigned by the TaskExecutor when a callback is canceled).
- */
- bool _done();
-
- /**
- * Executes the logic of _done().
- */
- bool _done_inlock();
-
- /**
- * Replaces _notification with a new notification.
- *
- * If _stopRetrying is false, for each remote that does not have a response or outstanding
- * request, schedules work to send the command to the remote.
- *
- * Invalid to call if there is an existing Notification and it has not yet been signaled.
- */
- void _scheduleRequestsIfNeeded(OperationContext* opCtx);
-
- /**
- * Helper to schedule a command to a remote.
- *
- * The 'remoteIndex' gives the position of the remote node from which we are retrieving the
- * batch in '_remotes'.
- *
- * Returns success if the command to retrieve the next batch was scheduled successfully.
- */
- Status _scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex);
-
- /**
- * The callback for a remote command.
- *
- * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
- * indicates which node the response came from and where the response should be buffered.
- *
- * On a retriable error, unless _stopRetrying is true, signals the notification so that the
- * request can be immediately retried.
- *
- * On a non-retriable error, if allowPartialResults is false, sets _stopRetrying to true.
- */
- void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- OperationContext* opCtx,
- size_t remoteIndex);
-
- /**
- * If the existing notification has not yet been signaled, signals it and marks it as signaled.
- */
- void _signalCurrentNotification_inlock();
-
- /**
- * Wrapper around signalCurrentNotification_inlock(); only signals the notification if _done()
- * is true.
- */
- void _signalCurrentNotificationIfDone_inlock();
-
- /**
* We instantiate one of these per remote host.
*/
struct RemoteData {
@@ -210,11 +176,6 @@ private:
RemoteData(ShardId shardId, BSONObj cmdObj);
/**
- * Returns the resolved host and port on which the remote command was or will be run.
- */
- const HostAndPort& getTargetHost() const;
-
- /**
* Given a read preference, selects a host on which the command should be run.
*/
Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
@@ -225,7 +186,7 @@ private:
std::shared_ptr<Shard> getShard();
// ShardId of the shard to which the command will be sent.
- const ShardId shardId;
+ ShardId shardId;
// The command object to send to the remote host.
BSONObj cmdObj;
@@ -243,18 +204,61 @@ private:
// The callback handle to an outstanding request for this remote.
executor::TaskExecutor::CallbackHandle cbHandle;
+
+ // Whether this remote's result has been returned.
+ bool done = false;
};
/**
- * Used internally to determine if the ARS should attempt to retry any requests. Is set to true
- * when:
- * - interrupt() or kill() is called
- * - allowPartialResults is false and some remote has a non-retriable error (or exhausts its
- * retries for a retriable error).
+ * Replaces _notification with a new notification.
+ *
+ * If _stopRetrying is false, schedules retries for remotes that have had a retriable error.
+ *
+ * If any remote has successfully received a response, returns a Response for it.
+ * If any remote has an error response that can't be retried, returns a Response for it.
+ * Otherwise, returns boost::none.
+ */
+ boost::optional<Response> _ready();
+
+ /**
+ * For each remote that had a response, checks if it had a retriable error, and clears its
+ * response if so.
+ *
+ * For each remote without a response or pending request, schedules the remote request.
+ *
+ * On failure to schedule a request, signals the notification.
+ */
+ void _scheduleRequests_inlock();
+
+ /**
+ * Helper to schedule a command to a remote.
+ *
+ * The 'remoteIndex' gives the position of the remote node from which we are retrieving the
+ * batch in '_remotes'.
+ *
+ * Returns success if the command was scheduled successfully.
*/
+ Status _scheduleRequest_inlock(size_t remoteIndex);
+
+ /**
+ * The callback for a remote command.
+ *
+ * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
+ * indicates which node the response came from and where the response should be buffered.
+ *
+ * Stores the response or error in the remote and signals the notification.
+ */
+ void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
+ size_t remoteIndex);
+
+ // Used internally to determine if the ARS should attempt to retry any requests. Is set to true
+ // when stopRetrying() or kill() is called.
bool _stopRetrying = false;
// Not owned here.
+ OperationContext* _opCtx;
+
+ // Not owned here.
executor::TaskExecutor* _executor;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
@@ -267,10 +271,6 @@ private:
// The readPreference to use for all requests.
ReadPreferenceSetting _readPreference;
- // If set to true, allows for skipping over hosts that have non-retriable errors or exhaust
- // their retries.
- bool _allowPartialResults = false;
-
// Must be acquired before accessing any data members.
// Must also be held when calling any of the '_inlock()' helper functions.
stdx::mutex _mutex;
@@ -281,6 +281,9 @@ private:
// A notification that gets signaled when a remote has a retriable error or the last outstanding
// response is received.
boost::optional<Notification<void>> _notification;
+
+ // Set to true when kill() is called so that it is only executed once.
+ bool _killed = false;
};
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
index 7a81082597f..733e318208b 100644
--- a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
+++ b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
@@ -111,7 +111,7 @@ Status enforceLegacyWriteConcern(OperationContext* opCtx,
requests.emplace_back(swShard.getValue()->getId(), gleCmd);
}
- // Send the requests and wait to receive all the responses.
+ // Send the requests.
const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
AsyncRequestsSender ars(opCtx,
@@ -119,12 +119,14 @@ Status enforceLegacyWriteConcern(OperationContext* opCtx,
dbName,
requests,
readPref);
- auto responses = ars.waitForResponses(opCtx);
- // Parse the responses.
+ // Receive the responses.
vector<Status> failedStatuses;
- for (const auto& response : responses) {
+ while (!ars.done()) {
+ // Block until a response is available.
+ auto response = ars.next();
+
// Return immediately if we failed to contact a shard.
if (!response.shardHostAndPort) {
invariant(!response.swResponse.isOK());
diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp
index 8b8a3f2e644..774f00a43c2 100644
--- a/src/mongo/s/commands/cluster_write.cpp
+++ b/src/mongo/s/commands/cluster_write.cpp
@@ -42,7 +42,6 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/chunk_manager_targeter.h"
-#include "mongo/s/commands/dbclient_multi_command.h"
#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_util.h"
@@ -309,8 +308,7 @@ void ClusterWriter::write(OperationContext* opCtx,
return;
}
- DBClientMultiCommand dispatcher;
- BatchWriteExec exec(&targeter, &dispatcher);
+ BatchWriteExec exec(&targeter);
exec.executeBatch(opCtx, *request, response, &_stats);
}
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 7986195c6f5..07ee899b5a5 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -270,7 +270,7 @@ private:
requests.emplace_back(shardStatus.getValue()->getId(), command);
}
- // Send the requests and wait to receive all the responses.
+ // Send the requests.
const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
AsyncRequestsSender ars(opCtx,
@@ -278,12 +278,14 @@ private:
dbName,
requests,
readPref);
- auto responses = ars.waitForResponses(opCtx);
- // Parse the responses.
+ // Receive the responses.
Status dispatchStatus = Status::OK();
- for (const auto& response : responses) {
+ while (!ars.done()) {
+ // Block until a response is available.
+ auto response = ars.next();
+
if (!response.swResponse.isOK()) {
dispatchStatus = std::move(response.swResponse.getStatus());
break;
@@ -295,12 +297,7 @@ private:
invariant(response.shardHostAndPort);
result.target = ConnectionString(std::move(*response.shardHostAndPort));
- auto shardStatus = shardRegistry->getShard(opCtx, result.target.toString());
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
- result.shardTargetId = shardStatus.getValue()->getId();
-
+ result.shardTargetId = std::move(response.shardId);
result.result = std::move(response.swResponse.getValue().data);
results->push_back(result);
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index 59d3762925f..f912196e595 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -116,7 +116,10 @@ void ShardingTestFixture::setUp() {
auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>();
netForPool->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHookForMongos>());
+ auto _mockNetworkForPool = netForPool.get();
auto execForPool = makeThreadPoolTestExecutor(std::move(netForPool));
+ _networkTestEnvForPool =
+ stdx::make_unique<NetworkTestEnv>(execForPool.get(), _mockNetworkForPool);
std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool;
executorsForPool.emplace_back(std::move(execForPool));
@@ -265,6 +268,10 @@ void ShardingTestFixture::onFindWithMetadataCommand(
_networkTestEnv->onFindWithMetadataCommand(func);
}
+void ShardingTestFixture::onCommandForPoolExecutor(NetworkTestEnv::OnCommandFunction func) {
+ _networkTestEnvForPool->onCommand(func);
+}
+
void ShardingTestFixture::setupShards(const std::vector<ShardType>& shards) {
auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); });
diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h
index 8bd7ad1844a..050da470678 100644
--- a/src/mongo/s/sharding_test_fixture.h
+++ b/src/mongo/s/sharding_test_fixture.h
@@ -116,6 +116,12 @@ protected:
executor::NetworkTestEnv::OnFindCommandWithMetadataFunction func);
/**
+ * Same as the onCommand* variants, but expects the request to be placed on the arbitrary
+ * executor of the Grid's executorPool.
+ */
+ void onCommandForPoolExecutor(executor::NetworkTestEnv::OnCommandFunction func);
+
+ /**
* Setup the shard registry to contain the given shards until the next reload.
*/
void setupShards(const std::vector<ShardType>& shards);
@@ -214,9 +220,14 @@ private:
RemoteCommandTargeterFactoryMock* _targeterFactory;
RemoteCommandTargeterMock* _configTargeter;
+ // For the Grid's fixed executor.
executor::NetworkInterfaceMock* _mockNetwork;
executor::TaskExecutor* _executor;
std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv;
+
+ // For the Grid's arbitrary executor in its executorPool.
+ std::unique_ptr<executor::NetworkTestEnv> _networkTestEnvForPool;
+
DistLockManagerMock* _distLockManager = nullptr;
ShardingCatalogClientImpl* _catalogClient = nullptr;
};
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 57f84edf800..99f71ddf85a 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -38,7 +38,8 @@
#include "mongo/bson/util/builder.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter.h"
-#include "mongo/s/client/multi_command_dispatch.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batch_write_op.h"
@@ -50,9 +51,9 @@ namespace mongo {
using std::make_pair;
using std::stringstream;
using std::vector;
+using std::map;
-BatchWriteExec::BatchWriteExec(NSTargeter* targeter, MultiCommandDispatch* dispatcher)
- : _targeter(targeter), _dispatcher(dispatcher) {}
+BatchWriteExec::BatchWriteExec(NSTargeter* targeter) : _targeter(targeter) {}
namespace {
@@ -62,7 +63,7 @@ namespace {
//
// TODO: Unordered map?
-typedef OwnedPointerMap<ConnectionString, TargetedWriteBatch> OwnedHostBatchMap;
+typedef OwnedPointerMap<ShardId, TargetedWriteBatch> OwnedShardBatchMap;
}
static void buildErrorFrom(const Status& status, WriteErrorDetail* error) {
@@ -90,7 +91,6 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
BatchWriteExecStats* stats) {
LOG(4) << "starting execution of write batch of size "
<< static_cast<int>(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS();
-
BatchWriteOp batchOp;
batchOp.initClientRequest(&clientRequest);
@@ -125,8 +125,8 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
// exactly when the metadata changed.
//
- OwnedPointerVector<TargetedWriteBatch> childBatchesOwned;
- vector<TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> childBatchesOwned;
+ map<ShardId, TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableMap();
// If we've already had a targeting error, we've refreshed the metadata once and can
// record target errors definitively.
@@ -149,85 +149,31 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
size_t numToSend = childBatches.size();
while (numSent != numToSend) {
// Collect batches out on the network, mapped by endpoint
- OwnedHostBatchMap ownedPendingBatches;
- OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap();
+ OwnedShardBatchMap ownedPendingBatches;
+ OwnedShardBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap();
//
- // Send side
+ // Construct the requests.
//
- // Get as many batches as we can at once
- for (vector<TargetedWriteBatch*>::iterator it = childBatches.begin();
- it != childBatches.end();
- ++it) {
- //
- // Collect the info needed to dispatch our targeted batch
- //
-
- TargetedWriteBatch* nextBatch = *it;
- // If the batch is NULL, we sent it previously, so skip
- if (nextBatch == NULL)
- continue;
-
- // Figure out what host we need to dispatch our targeted batch
- const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
- auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(
- opCtx, nextBatch->getEndpoint().shardName);
-
- bool resolvedHost = false;
- ConnectionString shardHost;
- if (!shardStatus.isOK()) {
- Status status(std::move(shardStatus.getStatus()));
+ vector<AsyncRequestsSender::Request> requests;
- // Record a resolve failure
- // TODO: It may be necessary to refresh the cache if stale, or maybe just
- // cancel and retarget the batch
- WriteErrorDetail error;
- buildErrorFrom(status, &error);
- LOG(4) << "unable to send write batch to " << nextBatch->getEndpoint().shardName
- << causedBy(status);
- batchOp.noteBatchError(*nextBatch, error);
- } else {
- auto shard = shardStatus.getValue();
-
- auto swHostAndPort = shard->getTargeter()->findHostNoWait(readPref);
- if (!swHostAndPort.isOK()) {
-
- // Record a resolve failure
- // TODO: It may be necessary to refresh the cache if stale, or maybe just
- // cancel and retarget the batch
- WriteErrorDetail error;
- buildErrorFrom(swHostAndPort.getStatus(), &error);
- LOG(4) << "unable to send write batch to "
- << nextBatch->getEndpoint().shardName
- << causedBy(swHostAndPort.getStatus());
- batchOp.noteBatchError(*nextBatch, error);
- } else {
- shardHost = ConnectionString(std::move(swHostAndPort.getValue()));
- resolvedHost = true;
- }
- }
+ // Get as many batches as we can at once
+ for (auto it = childBatches.begin(); it != childBatches.end(); ++it) {
- if (!resolvedHost) {
- ++stats->numResolveErrors;
+ TargetedWriteBatch* nextBatch = it->second;
- // We're done with this batch
- // Clean up when we can't resolve a host
- delete *it;
- *it = NULL;
- --numToSend;
+ // If the batch is NULL, we sent it previously, so skip
+ if (nextBatch == NULL)
continue;
- }
- // If we already have a batch for this host, wait until the next time
- OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find(shardHost);
+ // If we already have a batch for this shard, wait until the next time
+ ShardId targetShardId = nextBatch->getEndpoint().shardName;
+ OwnedShardBatchMap::MapType::iterator pendingIt =
+ pendingBatches.find(targetShardId);
if (pendingIt != pendingBatches.end())
continue;
- //
- // We now have all the info needed to dispatch the batch
- //
-
BatchedCommandRequest request(clientRequest.getBatchType());
batchOp.buildBatchRequest(*nextBatch, &request);
@@ -236,48 +182,90 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
NamespaceString nss(request.getNS());
request.setNS(nss);
- LOG(4) << "sending write batch to " << shardHost.toString() << ": "
+ LOG(4) << "sending write batch to " << targetShardId << ": "
<< redact(request.toString());
- _dispatcher->addCommand(shardHost, nss.db(), request.toBSON());
+ requests.emplace_back(targetShardId, request.toBSON());
// Indicate we're done by setting the batch to NULL
// We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast
// endpoints for the same host, so this should be pretty efficient without
// moving stuff around.
- *it = NULL;
+ it->second = NULL;
// Recv-side is responsible for cleaning up the nextBatch when used
- pendingBatches.insert(make_pair(shardHost, nextBatch));
+ pendingBatches.insert(make_pair(targetShardId, nextBatch));
}
- // Send them all out
- _dispatcher->sendAll();
+ //
+ // Send the requests.
+ //
+
+ const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
+ AsyncRequestsSender ars(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ clientRequest.getTargetingNSS().db(),
+ requests,
+ readPref);
numSent += pendingBatches.size();
//
- // Recv side
+ // Receive the responses.
//
- while (_dispatcher->numPending() > 0) {
- // Get the response
- ConnectionString shardHost;
- BatchedCommandResponse response;
- Status dispatchStatus = _dispatcher->recvAny(&shardHost, &response);
+ while (!ars.done()) {
+ // Block until a response is available.
+ auto response = ars.next();
// Get the TargetedWriteBatch to find where to put the response
- dassert(pendingBatches.find(shardHost) != pendingBatches.end());
- TargetedWriteBatch* batch = pendingBatches.find(shardHost)->second;
+ dassert(pendingBatches.find(response.shardId) != pendingBatches.end());
+ TargetedWriteBatch* batch = pendingBatches.find(response.shardId)->second;
+
+ // First check if we were able to target a shard host.
+ if (!response.shardHostAndPort) {
+ invariant(!response.swResponse.isOK());
+
+ // Record a resolve failure
+ // TODO: It may be necessary to refresh the cache if stale, or maybe just
+ // cancel and retarget the batch
+ LOG(4) << "unable to send write batch to " << batch->getEndpoint().shardName
+ << causedBy(response.swResponse.getStatus());
+ WriteErrorDetail error;
+ buildErrorFrom(std::move(response.swResponse.getStatus()), &error);
+ batchOp.noteBatchError(*batch, error);
+
+ // We're done with this batch
+ // Clean up when we can't resolve a host
+ auto it = childBatches.find(batch->getEndpoint().shardName);
+ invariant(it != childBatches.end());
+ delete it->second;
+ it->second = NULL;
+ continue;
+ }
+ ConnectionString shardHost(std::move(*response.shardHostAndPort));
+
+
+ // Then check if we successfully got a response.
+ Status status = response.swResponse.getStatus();
+ BatchedCommandResponse batchedCommandResponse;
+ if (status.isOK()) {
+ std::string errMsg;
+ if (!batchedCommandResponse.parseBSON(response.swResponse.getValue().data,
+ &errMsg) ||
+ !batchedCommandResponse.isValid(&errMsg)) {
+ status = {ErrorCodes::FailedToParse, errMsg};
+ }
+ }
- if (dispatchStatus.isOK()) {
+ if (status.isOK()) {
TrackedErrors trackedErrors;
trackedErrors.startTracking(ErrorCodes::StaleShardVersion);
LOG(4) << "write results received from " << shardHost.toString() << ": "
- << redact(response.toString());
+ << redact(batchedCommandResponse.toString());
// Dispatch was ok, note response
- batchOp.noteBatchResponse(*batch, response, &trackedErrors);
+ batchOp.noteBatchResponse(*batch, batchedCommandResponse, &trackedErrors);
// Note if anything was stale
const vector<ShardError*>& staleErrors =
@@ -291,22 +279,25 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
// Remember that we successfully wrote to this shard
// NOTE: This will record lastOps for shards where we actually didn't update
// or delete any documents, which preserves old behavior but is conservative
- stats->noteWriteAt(
- shardHost,
- response.isLastOpSet() ? response.getLastOp() : repl::OpTime(),
- response.isElectionIdSet() ? response.getElectionId() : OID());
+ stats->noteWriteAt(shardHost,
+ batchedCommandResponse.isLastOpSet()
+ ? batchedCommandResponse.getLastOp()
+ : repl::OpTime(),
+ batchedCommandResponse.isElectionIdSet()
+ ? batchedCommandResponse.getElectionId()
+ : OID());
} else {
// Error occurred dispatching, note it
stringstream msg;
msg << "write results unavailable from " << shardHost.toString()
- << causedBy(dispatchStatus.toString());
+ << causedBy(status.toString());
WriteErrorDetail error;
buildErrorFrom(Status(ErrorCodes::RemoteResultsUnavailable, msg.str()), &error);
LOG(4) << "unable to receive write results from " << shardHost.toString()
- << causedBy(redact(dispatchStatus.toString()));
+ << causedBy(redact(status.toString()));
batchOp.noteBatchError(*batch, error);
}
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index b430e3c5baf..51fd8633207 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -42,7 +42,6 @@
namespace mongo {
class BatchWriteExecStats;
-class MultiCommandDispatch;
class OperationContext;
/**
@@ -64,7 +63,7 @@ class BatchWriteExec {
MONGO_DISALLOW_COPYING(BatchWriteExec);
public:
- BatchWriteExec(NSTargeter* targeter, MultiCommandDispatch* dispatcher);
+ BatchWriteExec(NSTargeter* targeter);
/**
* Executes a client batch write request by sending child batches to several shard
@@ -80,9 +79,6 @@ public:
private:
// Not owned here
NSTargeter* _targeter;
-
- // Not owned here
- MultiCommandDispatch* _dispatcher;
};
struct HostOpTime {
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 30833b726eb..73cdd24a006 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -30,11 +30,9 @@
#include "mongo/s/write_ops/batch_write_exec.h"
-#include "mongo/base/owned_pointer_vector.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/client/mock_multi_write_command.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_test_fixture.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -54,6 +52,7 @@ namespace {
const HostAndPort kTestShardHost = HostAndPort("FakeHost", 12345);
const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
const string shardName = "FakeShard";
+const int kMaxRoundsWithoutProgress = 5;
/**
* Mimics a single shard backend for a particular collection which can be initialized with a
@@ -94,18 +93,81 @@ public:
nsTargeter.init(mockRanges);
// Make the batch write executor use the mock backend.
- exec.reset(new BatchWriteExec(&nsTargeter, &dispatcher));
+ exec.reset(new BatchWriteExec(&nsTargeter));
}
- void setMockResults(const vector<MockWriteResult*>& results) {
- dispatcher.init(results);
+ void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ BatchedInsertRequest actualBatchedInsert;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
+
+ ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().toString());
+
+ auto inserted = actualBatchedInsert.getDocuments();
+ ASSERT_EQUALS(expected.size(), inserted.size());
+
+ auto itInserted = inserted.begin();
+ auto itExpected = expected.begin();
+
+ for (; itInserted != inserted.end(); itInserted++, itExpected++) {
+ ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
+ }
+
+ BatchedCommandResponse response;
+ response.setOk(true);
+
+ return response.toBSON();
+ });
+ }
+
+ void expectInsertsReturnStaleVersionErrors(const std::vector<BSONObj>& expected) {
+ WriteErrorDetail error;
+ error.setErrCode(ErrorCodes::StaleShardVersion);
+ error.setErrMessage("mock stale error");
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ BatchedInsertRequest actualBatchedInsert;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
+
+ ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().toString());
+
+ auto inserted = actualBatchedInsert.getDocuments();
+ ASSERT_EQUALS(expected.size(), inserted.size());
+
+ auto itInserted = inserted.begin();
+ auto itExpected = expected.begin();
+
+ for (; itInserted != inserted.end(); itInserted++, itExpected++) {
+ ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
+ }
+
+ BatchedCommandResponse staleResponse;
+ staleResponse.setOk(true);
+ staleResponse.setN(0);
+
+ // Report a stale version error for each write in the batch.
+ int i = 0;
+ for (itInserted = inserted.begin(); itInserted != inserted.end(); ++itInserted) {
+ WriteErrorDetail* errorCopy = new WriteErrorDetail;
+ error.cloneTo(errorCopy);
+ errorCopy->setIndex(i);
+ staleResponse.addToErrDetails(errorCopy);
+ ++i;
+ }
+
+ return staleResponse.toBSON();
+ });
}
ConnectionString shardHost{kTestShardHost};
NamespaceString nss{"foo.bar"};
MockNSTargeter nsTargeter;
- MockMultiWriteCommand dispatcher;
unique_ptr<BatchWriteExec> exec;
};
@@ -124,14 +186,21 @@ TEST_F(BatchWriteExecTest, SingleOp) {
request.setOrdered(false);
request.setWriteConcern(BSONObj());
// Do single-target, single doc batch write op
- request.getInsertRequest()->addToDocuments(BSON("x" << 1));
+ auto objToInsert = BSON("x" << 1);
+ request.getInsertRequest()->addToDocuments(objToInsert);
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
- ASSERT(response.getOk());
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ exec->executeBatch(operationContext(), request, &response, &stats);
+ ASSERT(response.getOk());
+ ASSERT_EQUALS(stats.numRounds, 1);
+ });
- ASSERT_EQUALS(stats.numRounds, 1);
+ std::vector<BSONObj> expected{objToInsert};
+ expectInsertsReturnSuccess(expected);
+
+ future.timed_get(kFutureTimeout);
}
TEST_F(BatchWriteExecTest, SingleOpError) {
@@ -139,33 +208,57 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
// Basic error test
//
- vector<MockWriteResult*> mockResults;
BatchedCommandResponse errResponse;
errResponse.setOk(false);
errResponse.setErrCode(ErrorCodes::UnknownError);
errResponse.setErrMessage("mock error");
- mockResults.push_back(new MockWriteResult(shardHost, errResponse));
-
- setMockResults(mockResults);
BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
request.setNS(nss);
request.setOrdered(false);
request.setWriteConcern(BSONObj());
// Do single-target, single doc batch write op
- request.getInsertRequest()->addToDocuments(BSON("x" << 1));
-
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
- ASSERT(response.getOk());
- ASSERT_EQUALS(response.getN(), 0);
- ASSERT(response.isErrDetailsSet());
- ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), errResponse.getErrCode());
- ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) !=
- string::npos);
-
- ASSERT_EQUALS(stats.numRounds, 1);
+ auto objToInsert = BSON("x" << 1);
+ request.getInsertRequest()->addToDocuments(objToInsert);
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ exec->executeBatch(operationContext(), request, &response, &stats);
+ ASSERT(response.getOk());
+ ASSERT_EQUALS(response.getN(), 0);
+ ASSERT(response.isErrDetailsSet());
+ ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), errResponse.getErrCode());
+ ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) !=
+ string::npos);
+
+ ASSERT_EQUALS(stats.numRounds, 1);
+ });
+
+ std::vector<BSONObj> expected{objToInsert};
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ BatchedInsertRequest actualBatchedInsert;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
+
+ ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().toString());
+
+ auto inserted = actualBatchedInsert.getDocuments();
+ ASSERT_EQUALS(expected.size(), inserted.size());
+
+ auto itInserted = inserted.begin();
+ auto itExpected = expected.begin();
+
+ for (; itInserted != inserted.end(); itInserted++, itExpected++) {
+ ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
+ }
+
+ return errResponse.toBSON();
+ });
+
+ future.timed_get(kFutureTimeout);
}
//
@@ -183,23 +276,24 @@ TEST_F(BatchWriteExecTest, StaleOp) {
request.setOrdered(false);
request.setWriteConcern(BSONObj());
// Do single-target, single doc batch write op
- request.getInsertRequest()->addToDocuments(BSON("x" << 1));
+ auto objToInsert = BSON("x" << 1);
+ request.getInsertRequest()->addToDocuments(objToInsert);
- vector<MockWriteResult*> mockResults;
- WriteErrorDetail error;
- error.setErrCode(ErrorCodes::StaleShardVersion);
- error.setErrMessage("mock stale error");
- mockResults.push_back(new MockWriteResult(shardHost, error));
+ // Execute request
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ exec->executeBatch(operationContext(), request, &response, &stats);
+ ASSERT(response.getOk());
- setMockResults(mockResults);
+ ASSERT_EQUALS(stats.numStaleBatches, 1);
+ });
- // Execute request
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
- ASSERT(response.getOk());
+ std::vector<BSONObj> expected{objToInsert};
+ expectInsertsReturnStaleVersionErrors(expected);
+ expectInsertsReturnSuccess(expected);
- ASSERT_EQUALS(stats.numStaleBatches, 1);
+ future.timed_get(kFutureTimeout);
}
TEST_F(BatchWriteExecTest, MultiStaleOp) {
@@ -213,25 +307,28 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
request.setOrdered(false);
request.setWriteConcern(BSONObj());
// Do single-target, single doc batch write op
- request.getInsertRequest()->addToDocuments(BSON("x" << 1));
+ auto objToInsert = BSON("x" << 1);
+ request.getInsertRequest()->addToDocuments(objToInsert);
- vector<MockWriteResult*> mockResults;
- WriteErrorDetail error;
- error.setErrCode(ErrorCodes::StaleShardVersion);
- error.setErrMessage("mock stale error");
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ exec->executeBatch(operationContext(), request, &response, &stats);
+ ASSERT(response.getOk());
+
+ ASSERT_EQUALS(stats.numStaleBatches, 3);
+ });
+
+ std::vector<BSONObj> expected{objToInsert};
+
+ // Return multiple StaleShardVersion errors
for (int i = 0; i < 3; i++) {
- mockResults.push_back(new MockWriteResult(shardHost, error));
+ expectInsertsReturnStaleVersionErrors(expected);
}
- setMockResults(mockResults);
+ expectInsertsReturnSuccess(expected);
- // Execute request
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
- ASSERT(response.getOk());
-
- ASSERT_EQUALS(stats.numStaleBatches, 3);
+ future.timed_get(kFutureTimeout);
}
TEST_F(BatchWriteExecTest, TooManyStaleOp) {
@@ -247,60 +344,32 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
request.setOrdered(false);
request.setWriteConcern(BSONObj());
// Do single-target, single doc batch write ops
- request.getInsertRequest()->addToDocuments(BSON("x" << 1));
- request.getInsertRequest()->addToDocuments(BSON("x" << 2));
-
- vector<MockWriteResult*> mockResults;
- WriteErrorDetail error;
- error.setErrCode(ErrorCodes::StaleShardVersion);
- error.setErrMessage("mock stale error");
- for (int i = 0; i < 10; i++) {
- mockResults.push_back(new MockWriteResult(shardHost, error, request.sizeWriteOps()));
+ auto objToInsert1 = BSON("x" << 1);
+ auto objToInsert2 = BSON("x" << 2);
+ request.getInsertRequest()->addToDocuments(objToInsert1);
+ request.getInsertRequest()->addToDocuments(objToInsert2);
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ exec->executeBatch(operationContext(), request, &response, &stats);
+ ASSERT(response.getOk());
+ ASSERT_EQUALS(response.getN(), 0);
+ ASSERT(response.isErrDetailsSet());
+ ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade);
+ ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade);
+
+ ASSERT_EQUALS(stats.numStaleBatches, (1 + kMaxRoundsWithoutProgress));
+ });
+
+ std::vector<BSONObj> expected{objToInsert1, objToInsert2};
+
+ // Return multiple StaleShardVersion errors
+ for (int i = 0; i < (1 + kMaxRoundsWithoutProgress); i++) {
+ expectInsertsReturnStaleVersionErrors(expected);
}
- setMockResults(mockResults);
-
- // Execute request
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
- ASSERT(response.getOk());
- ASSERT_EQUALS(response.getN(), 0);
- ASSERT(response.isErrDetailsSet());
- ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade);
- ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade);
-}
-
-TEST_F(BatchWriteExecTest, ManyStaleOpWithMigration) {
- //
- // Retry op in exec many times b/c of stale config, but simulate remote migrations occurring
- //
-
- // Insert request
- BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
- request.setNS(nss);
- request.setOrdered(false);
- request.setWriteConcern(BSONObj());
- // Do single-target, single doc batch write op
- request.getInsertRequest()->addToDocuments(BSON("x" << 1));
-
- vector<MockWriteResult*> mockResults;
- WriteErrorDetail error;
- error.setErrCode(ErrorCodes::StaleShardVersion);
- error.setErrMessage("mock stale error");
- for (int i = 0; i < 10; i++) {
- mockResults.push_back(new MockWriteResult(shardHost, error));
- }
-
- setMockResults(mockResults);
-
- // Execute request
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- exec->executeBatch(operationContext(), request, &response, &stats);
- ASSERT(response.getOk());
-
- ASSERT_EQUALS(stats.numStaleBatches, 6);
+ future.timed_get(kFutureTimeout);
}
} // namespace
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index c1e411cb2ed..be0b6a05286 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -238,7 +238,7 @@ static void cancelBatches(const WriteErrorDetail& why,
Status BatchWriteOp::targetBatch(OperationContext* opCtx,
const NSTargeter& targeter,
bool recordTargetErrors,
- vector<TargetedWriteBatch*>* targetedBatches) {
+ std::map<ShardId, TargetedWriteBatch*>* targetedBatches) {
//
// Targeting of unordered batches is fairly simple - each remaining write op is targeted,
// and each of those targeted writes are grouped into a batch for a particular shard
@@ -402,7 +402,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
// Remember targeted batch for reporting
_targeted.insert(batch);
// Send the handle back to caller
- targetedBatches->push_back(batch);
+ invariant(targetedBatches->find(batch->getEndpoint().shardName) == targetedBatches->end());
+ targetedBatches->insert(std::make_pair(batch->getEndpoint().shardName, batch));
}
return Status::OK();
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 3bfbdf39954..5e37c87598c 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -110,7 +110,7 @@ public:
Status targetBatch(OperationContext* opCtx,
const NSTargeter& targeter,
bool recordTargetErrors,
- std::vector<TargetedWriteBatch*>* targetedBatches);
+ std::map<ShardId, TargetedWriteBatch*>* targetedBatches);
/**
* Fills a BatchCommandRequest from a TargetedWriteBatch for this BatchWriteOp.
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index e5bc9b5a5cd..b34c3a4a20b 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/base/owned_pointer_map.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -39,8 +39,9 @@
namespace mongo {
-using std::unique_ptr;
+using std::map;
using std::string;
+using std::unique_ptr;
using std::vector;
namespace {
@@ -150,19 +151,19 @@ TEST(WriteOpTests, SingleOp) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
BatchedCommandResponse response;
buildResponse(1, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -190,19 +191,19 @@ TEST(WriteOpTests, SingleError) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
BatchedCommandResponse response;
buildErrResponse(ErrorCodes::UnknownError, "message", &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -236,8 +237,8 @@ TEST(WriteOpTests, SingleTargetError) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(!status.isOK());
@@ -279,17 +280,17 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
BatchedCommandRequest targetBatch(BatchedCommandRequest::BatchType_Insert);
- batchOp.buildBatchRequest(*targeted.front(), &targetBatch);
+ batchOp.buildBatchRequest(*targeted.begin()->second, &targetBatch);
ASSERT(targetBatch.getWriteConcern().woCompare(request.getWriteConcern()) == 0);
BatchedCommandResponse response;
@@ -297,7 +298,7 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) {
addWCError(&response);
// First stale response comes back, we should retry
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -327,8 +328,8 @@ TEST(WriteOpTests, SingleStaleError) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
BatchedCommandResponse response;
@@ -336,14 +337,14 @@ TEST(WriteOpTests, SingleStaleError) {
addError(ErrorCodes::StaleShardVersion, "mock stale error", 0, &response);
// First stale response comes back, we should retry
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
// Respond again with a stale response
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -352,7 +353,7 @@ TEST(WriteOpTests, SingleStaleError) {
buildResponse(1, &response);
// Respond with an 'ok' response
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -366,16 +367,6 @@ TEST(WriteOpTests, SingleStaleError) {
// Multi-operation batches
//
-struct EndpointComp {
- bool operator()(const TargetedWriteBatch* writeA, const TargetedWriteBatch* writeB) const {
- return writeA->getEndpoint().shardName.compare(writeB->getEndpoint().shardName) < 0;
- }
-};
-
-inline void sortByEndpoint(vector<TargetedWriteBatch*>* writes) {
- std::sort(writes->begin(), writes->end(), EndpointComp());
-}
-
TEST(WriteOpTests, MultiOpSameShardOrdered) {
//
// Multi-op targeting test (ordered)
@@ -398,20 +389,20 @@ TEST(WriteOpTests, MultiOpSameShardOrdered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
BatchedCommandResponse response;
buildResponse(2, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -442,20 +433,20 @@ TEST(WriteOpTests, MultiOpSameShardUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
BatchedCommandResponse response;
buildResponse(2, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -488,21 +479,21 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointA);
BatchedCommandResponse response;
buildResponse(1, &response);
// Respond to first targeted batch
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -510,11 +501,11 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointB);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointB);
// Respond to second targeted batch
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -523,6 +514,22 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) {
ASSERT_EQUALS(clientResponse.getN(), 2);
}
+void verifyTargetedBatches(map<ShardId, size_t> expected,
+ const map<ShardId, TargetedWriteBatch*>& targeted) {
+ // 'expected' contains each ShardId that was expected to be targeted and the size of the batch
+ // that was expected to be targeted to it.
+ // We check that each ShardId in 'targeted' corresponds to one in 'expected', in that it
+ // contains a batch of the correct size.
+ // Finally, we ensure that no additional ShardIds are present in 'targeted' than 'expected'.
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT_EQUALS(expected[it->second->getEndpoint().shardName],
+ it->second->getWrites().size());
+ ASSERT_EQUALS(ChunkVersion::IGNORED(), it->second->getEndpoint().shardVersion);
+ expected.erase(expected.find(it->second->getEndpoint().shardName));
+ }
+ ASSERT(expected.empty());
+}
+
TEST(WriteOpTests, MultiOpTwoShardsUnordered) {
//
// Multi-op, multi-endpoint targeting test (unordered)
@@ -547,26 +554,23 @@ TEST(WriteOpTests, MultiOpTwoShardsUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
// Respond to both targeted batches
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -601,26 +605,23 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
// Respond to both targeted batches for first multi-delete
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -628,16 +629,13 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
// Respond to second targeted batches for second multi-delete
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -672,26 +670,23 @@ TEST(WriteOpTests, MultiOpTwoShardsEachUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 2u);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 2u}, {endpointB.shardName, 2u}}, targeted);
BatchedCommandResponse response;
buildResponse(2, &response);
// Respond to both targeted batches, each containing two ops
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -734,22 +729,22 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointA);
BatchedCommandResponse response;
// Emulate one-write-per-delete-per-host
buildResponse(2, &response);
// Respond to first targeted batch containing the two single-host deletes
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -758,19 +753,16 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
// Emulate one-write-per-delete-per-host
buildResponse(1, &response);
// Respond to two targeted batches for first multi-delete
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -779,16 +771,13 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
// Respond to two targeted batches for second multi-delete
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -797,14 +786,14 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointB);
// Emulate one-write-per-delete-per-host
buildResponse(2, &response);
// Respond to final targeted batch containing the last two single-host deletes
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -846,27 +835,24 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 4u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 4u);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
+ verifyTargetedBatches({{endpointA.shardName, 4u}, {endpointB.shardName, 4u}}, targeted);
BatchedCommandResponse response;
// Emulate one-write-per-delete-per-host
buildResponse(4, &response);
// Respond to first targeted batch containing the two single-host deletes
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -899,32 +885,33 @@ TEST(WriteOpTests, MultiOpSingleShardErrorUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
+ // Respond to batches.
+ auto targetedIt = targeted.begin();
+
// No error on first shard
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(!batchOp.isFinished());
buildResponse(0, &response);
addError(ErrorCodes::UnknownError, "mock error", 0, &response);
// Error on second write on second shard
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ ++targetedIt;
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(batchOp.isFinished());
+ ASSERT(++targetedIt == targeted.end());
BatchedCommandResponse clientResponse;
batchOp.buildClientResponse(&clientResponse);
@@ -963,29 +950,24 @@ TEST(WriteOpTests, MultiOpTwoShardErrorsUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
BatchedCommandResponse response;
buildResponse(0, &response);
addError(ErrorCodes::UnknownError, "mock error", 0, &response);
- // Error on first write on first shard
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
- ASSERT(!batchOp.isFinished());
-
- // Error on second write on second shard
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ // Error on first write on first shard and second write on second shard.
+ for (auto it = targeted.begin(); it != targeted.end(); ++it) {
+ ASSERT(!batchOp.isFinished());
+ batchOp.noteBatchResponse(*it->second, response, NULL);
+ }
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -1032,32 +1014,33 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorUnordered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 2u);
+ verifyTargetedBatches({{endpointA.shardName, 2u}, {endpointB.shardName, 2u}}, targeted);
+
+ // Respond to batches.
+ auto targetedIt = targeted.begin();
BatchedCommandResponse response;
buildResponse(2, &response);
// No errors on first shard
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(!batchOp.isFinished());
buildResponse(1, &response);
addError(ErrorCodes::UnknownError, "mock error", 1, &response);
// Error on second write on second shard
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ ++targetedIt;
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(batchOp.isFinished());
+ ASSERT(++targetedIt == targeted.end());
BatchedCommandResponse clientResponse;
batchOp.buildClientResponse(&clientResponse);
@@ -1099,32 +1082,33 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorOrdered) {
batchOp.initClientRequest(&request);
ASSERT(!batchOp.isFinished());
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 2u);
- sortByEndpoint(&targeted);
- assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA);
- assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
- ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u);
+ verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted);
+
+ // Respond to batches.
+ auto targetedIt = targeted.begin();
BatchedCommandResponse response;
buildResponse(1, &response);
// No errors on first shard
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(!batchOp.isFinished());
buildResponse(0, &response);
addError(ErrorCodes::UnknownError, "mock error", 0, &response);
// Error on second write on second shard
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ ++targetedIt;
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(batchOp.isFinished());
+ ASSERT(++targetedIt == targeted.end());
BatchedCommandResponse clientResponse;
batchOp.buildClientResponse(&clientResponse);
@@ -1167,8 +1151,8 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
BatchedCommandResponse response;
@@ -1177,7 +1161,7 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) {
addWCError(&response);
// First stale response comes back, we should retry
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
// Unordered reports write concern error
@@ -1212,24 +1196,29 @@ TEST(WriteOpTests, SingleOpErrorAndWriteConcernErrorOrdered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
+ // Respond to batches.
+ auto targetedIt = targeted.begin();
+
BatchedCommandResponse response;
buildResponse(1, &response);
addWCError(&response);
// First response comes back with write concern error
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(!batchOp.isFinished());
buildResponse(0, &response);
addError(ErrorCodes::UnknownError, "mock error", 0, &response);
// Second response comes back with write error
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ ++targetedIt;
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(batchOp.isFinished());
+ ASSERT(++targetedIt == targeted.end());
// Ordered doesn't report write concern error
BatchedCommandResponse clientResponse;
@@ -1263,8 +1252,8 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
// First targeting round fails since we may be stale
@@ -1278,13 +1267,13 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u);
BatchedCommandResponse response;
buildResponse(1, &response);
// First response ok
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -1327,8 +1316,8 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
// First targeting round fails since we may be stale
@@ -1342,13 +1331,13 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) {
ASSERT(status.isOK());
ASSERT(!batchOp.isFinished());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u);
BatchedCommandResponse response;
buildResponse(2, &response);
// Response is ok for first and third write
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -1382,15 +1371,15 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
// First shard batch is ok
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
@@ -1399,7 +1388,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) {
buildErrResponse(ErrorCodes::UnknownError, "mock error", &response);
// Second shard batch fails
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
// We should have recorded an error for the second write
@@ -1436,22 +1425,27 @@ TEST(WriteOpTests, MultiOpFailedBatchUnordered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
+ // Respond to batches.
+ auto targetedIt = targeted.begin();
+
BatchedCommandResponse response;
buildResponse(1, &response);
// First shard batch is ok
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(!batchOp.isFinished());
buildErrResponse(ErrorCodes::UnknownError, "mock error", &response);
// Second shard batch fails
- batchOp.noteBatchResponse(*targeted.back(), response, NULL);
+ ++targetedIt;
+ batchOp.noteBatchResponse(*targetedIt->second, response, NULL);
ASSERT(batchOp.isFinished());
+ ASSERT(++targetedIt == targeted.end());
// We should have recorded an error for the second and third write
BatchedCommandResponse clientResponse;
@@ -1488,15 +1482,15 @@ TEST(WriteOpTests, MultiOpAbortOrdered) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
BatchedCommandResponse response;
buildResponse(1, &response);
// First shard batch is ok
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
WriteErrorDetail abortError;
@@ -1579,8 +1573,8 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
BatchedCommandResponse response;
@@ -1588,14 +1582,14 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) {
addWCError(&response);
// First shard write write concern fails.
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
status = batchOp.targetBatch(&opCtx, targeter, true, &targeted);
// Second shard write write concern fails.
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
BatchedCommandResponse clientResponse;
@@ -1632,8 +1626,8 @@ TEST(WriteOpLimitTests, OneBigDoc) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
@@ -1641,7 +1635,7 @@ TEST(WriteOpLimitTests, OneBigDoc) {
BatchedCommandResponse response;
buildResponse(1, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
}
@@ -1669,26 +1663,26 @@ TEST(WriteOpLimitTests, OneBigOneSmall) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u);
BatchedCommandResponse response;
buildResponse(1, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
}
@@ -1714,26 +1708,26 @@ TEST(WriteOpLimitTests, TooManyOps) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 1000u);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1000u);
BatchedCommandResponse response;
buildResponse(1, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
+ ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
}
@@ -1778,34 +1772,34 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) {
BatchWriteOp batchOp;
batchOp.initClientRequest(&request);
- OwnedPointerVector<TargetedWriteBatch> targetedOwned;
- vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u);
+ ASSERT_LESS_THAN(targeted.begin()->second->getWrites().size(), 1000u);
BatchedCommandRequest childRequest(BatchedCommandRequest::BatchType_Update);
- batchOp.buildBatchRequest(*targeted.front(), &childRequest);
+ batchOp.buildBatchRequest(*targeted.begin()->second, &childRequest);
ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize);
BatchedCommandResponse response;
buildResponse(1, &response);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(!batchOp.isFinished());
targetedOwned.clear();
status = batchOp.targetBatch(&opCtx, targeter, false, &targeted);
ASSERT(status.isOK());
ASSERT_EQUALS(targeted.size(), 1u);
- ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u);
+ ASSERT_LESS_THAN(targeted.begin()->second->getWrites().size(), 1000u);
childRequest.clear();
- batchOp.buildBatchRequest(*targeted.front(), &childRequest);
+ batchOp.buildBatchRequest(*targeted.begin()->second, &childRequest);
ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize);
- batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL);
ASSERT(batchOp.isFinished());
}