diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-06 18:17:34 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-13 15:09:46 -0400 |
commit | 965dc76f4b4e27f7a9e3bc7810b608c53085d32f (patch) | |
tree | e5554cdfb59e4df76adc55d4851b2aaa54dabc12 /src/mongo/s | |
parent | 90bd4ed6ba5d0f3353d1af42c667cd6a2c1a540e (diff) | |
download | mongo-965dc76f4b4e27f7a9e3bc7810b608c53085d32f.tar.gz |
SERVER-28164 make ClusterWrite::run path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 283 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 201 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_get_last_error_cmd.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 17 | ||||
-rw-r--r-- | src/mongo/s/sharding_test_fixture.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/sharding_test_fixture.h | 11 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 185 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.h | 6 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 285 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 2 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op_test.cpp | 424 |
13 files changed, 734 insertions, 706 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 1ec65430f75..aa11a3c1216 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -55,12 +55,8 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, executor::TaskExecutor* executor, StringData db, const std::vector<AsyncRequestsSender::Request>& requests, - const ReadPreferenceSetting& readPreference, - bool allowPartialResults) - : _executor(executor), - _db(std::move(db)), - _readPreference(readPreference), - _allowPartialResults(allowPartialResults) { + const ReadPreferenceSetting& readPreference) + : _opCtx(opCtx), _executor(executor), _db(std::move(db)), _readPreference(readPreference) { for (const auto& request : requests) { _remotes.emplace_back(request.shardId, request.cmdObj); } @@ -73,44 +69,35 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, _metadataObj = metadataBuilder.obj(); // Schedule the requests immediately. - _scheduleRequestsIfNeeded(opCtx); + // We must create the notification before scheduling any requests, because the notification is + // signaled both on an error in scheduling the request and a request's callback. Similarly, we + // lock so that no callbacks signal the notification until after we are done scheduling + // requests, to prevent signaling the notification twice. + _notification.emplace(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _scheduleRequests_inlock(); } - AsyncRequestsSender::~AsyncRequestsSender() { - invariant(_done()); -} - -std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses( - OperationContext* opCtx) { - invariant(!_remotes.empty()); - - // Until all remotes have received a response or error, keep scheduling retries and waiting on - // outstanding requests. - while (!_done()) { - _notification->get(); + // Make sure any pending network I/O has been canceled. + kill(); - // Note: if we have been interrupt()'d or if some remote had a non-retriable error and - // allowPartialResults is false, no retries will be scheduled. - _scheduleRequestsIfNeeded(opCtx); - } - - // Construct the responses. - std::vector<Response> responses; - for (const auto& remote : _remotes) { - invariant(remote.swResponse); - if (remote.swResponse->isOK()) { - invariant(remote.shardHostAndPort); - responses.emplace_back(std::move(remote.swResponse->getValue()), - std::move(*remote.shardHostAndPort)); - } else { - responses.emplace_back(std::move(remote.swResponse->getStatus()), - std::move(remote.shardHostAndPort)); - } + // Wait on remaining callbacks to run. + while (!done()) { + next(); } +} - _remotes.clear(); +AsyncRequestsSender::Response AsyncRequestsSender::next() { + invariant(!done()); - return responses; + // If needed, schedule requests for all remotes which had retriable errors. + // If some remote had success or a non-retriable error, return it. + boost::optional<Response> readyResponse; + while (!(readyResponse = _ready())) { + // Otherwise, wait for some response to be received. + _notification->get(_opCtx); + } + return *readyResponse; } void AsyncRequestsSender::interrupt() { @@ -120,78 +107,112 @@ void AsyncRequestsSender::interrupt() { void AsyncRequestsSender::kill() { stdx::lock_guard<stdx::mutex> lk(_mutex); - _stopRetrying = true; + if (_killed) { + return; + } + _stopRetrying = true; // Cancel all outstanding requests so they return immediately. for (auto& remote : _remotes) { if (remote.cbHandle.isValid()) { _executor->cancel(remote.cbHandle); } } + _killed = true; } -bool AsyncRequestsSender::_done() { +bool AsyncRequestsSender::done() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _done_inlock(); + return std::all_of( + _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; }); } -bool AsyncRequestsSender::_done_inlock() { - for (const auto& remote : _remotes) { - if (!remote.swResponse) { - return false; - } - } - return true; -} - -/* - * Note: If _scheduleRequestsIfNeeded() does retries, only the remotes with retriable errors will be - * rescheduled because: - * - * 1. Other pending remotes still have callback assigned to them. - * 2. Remotes that already successfully received a response will have a non-empty 'response'. - * 3. Remotes that have reached maximum retries will have an error status. - */ -void AsyncRequestsSender::_scheduleRequestsIfNeeded(OperationContext* opCtx) { +boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); - // We can't make a new notification if there was a previous one that has not been signaled. - invariant(!_notification || *_notification); + _notification.emplace(); - if (_done_inlock()) { - return; + if (!_stopRetrying) { + _scheduleRequests_inlock(); } - _notification.emplace(); - - if (_stopRetrying) { - return; + // Check if any remote is ready. + invariant(!_remotes.empty()); + for (auto& remote : _remotes) { + if (remote.swResponse && !remote.done) { + remote.done = true; + if (remote.swResponse->isOK()) { + invariant(remote.shardHostAndPort); + return Response(std::move(remote.shardId), + std::move(remote.swResponse->getValue()), + std::move(*remote.shardHostAndPort)); + } else { + return Response(std::move(remote.shardId), + std::move(remote.swResponse->getStatus()), + std::move(remote.shardHostAndPort)); + } + } } + // No remotes were ready. + return boost::none; +} +void AsyncRequestsSender::_scheduleRequests_inlock() { + invariant(!_stopRetrying); // Schedule remote work on hosts for which we have not sent a request or need to retry. for (size_t i = 0; i < _remotes.size(); ++i) { auto& remote = _remotes[i]; - // If we have not yet received a response or error for this remote, and we do not have an - // outstanding request for this remote, schedule remote work to send the command. + // First check if the remote had a retriable error, and if so, clear its response field so + // it will be retried. + if (remote.swResponse && !remote.done) { + // We check both the response status and command status for a retriable error. + Status status = remote.swResponse->getStatus(); + if (status.isOK()) { + status = getStatusFromCommandResult(remote.swResponse->getValue().data); + } + + if (!status.isOK()) { + // There was an error with either the response or the command. + auto shard = remote.getShard(); + if (!shard) { + remote.swResponse = + Status(ErrorCodes::ShardNotFound, + str::stream() << "Could not find shard " << remote.shardId); + } else { + if (remote.shardHostAndPort) { + shard->updateReplSetMonitor(*remote.shardHostAndPort, status); + } + if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && + remote.retryCount < kMaxNumFailedHostRetryAttempts) { + LOG(1) << "Command to remote " << remote.shardId << " at host " + << *remote.shardHostAndPort + << " failed with retriable error and will be retried " + << causedBy(redact(status)); + ++remote.retryCount; + remote.swResponse.reset(); + } + } + } + } + + // If the remote does not have a response or pending request, schedule remote work for it. if (!remote.swResponse && !remote.cbHandle.isValid()) { - auto scheduleStatus = _scheduleRequest_inlock(opCtx, i); + auto scheduleStatus = _scheduleRequest_inlock(i); if (!scheduleStatus.isOK()) { - // Being unable to schedule a request to a remote is a non-retriable error. remote.swResponse = std::move(scheduleStatus); - - // If partial results are not allowed, stop scheduling requests on other remotes and - // just wait for outstanding requests to come back. - if (!_allowPartialResults) { - _stopRetrying = true; - break; + // Signal the notification indicating the remote had an error (we need to do this + // because no request was scheduled, so no callback for this remote will run and + // signal the notification). + if (!*_notification) { + _notification->set(); } } } } } -Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex) { +Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -203,15 +224,12 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz } executor::RemoteCommandRequest request( - remote.getTargetHost(), _db.toString(), remote.cmdObj, _metadataObj, opCtx); - - auto callbackStatus = - _executor->scheduleRemoteCommand(request, - stdx::bind(&AsyncRequestsSender::_handleResponse, - this, - stdx::placeholders::_1, - opCtx, - remoteIndex)); + *remote.shardHostAndPort, _db.toString(), remote.cmdObj, _metadataObj, _opCtx); + + auto callbackStatus = _executor->scheduleRemoteCommand( + request, + stdx::bind( + &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex)); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -221,9 +239,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz } void AsyncRequestsSender::_handleResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - OperationContext* opCtx, - size_t remoteIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { stdx::lock_guard<stdx::mutex> lk(_mutex); auto& remote = _remotes[remoteIndex]; @@ -233,94 +249,37 @@ void AsyncRequestsSender::_handleResponse( // 'remote'. remote.cbHandle = executor::TaskExecutor::CallbackHandle(); - // On early return from this point on, signal anyone waiting on the current notification if - // _done() is true, since this might be the last outstanding request. - ScopeGuard signaller = - MakeGuard(&AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock, this); - - // We check both the response status and command status for a retriable error. - Status status = cbData.response.status; - if (status.isOK()) { - status = getStatusFromCommandResult(cbData.response.data); - if (status.isOK()) { - remote.swResponse = std::move(cbData.response); - return; - } - } - - // There was an error with either the response or the command. - - auto shard = remote.getShard(); - if (!shard) { - remote.swResponse = - Status(ErrorCodes::ShardNotFound, - str::stream() << "Could not find shard " << remote.shardId << " containing host " - << remote.getTargetHost().toString()); - return; - } - shard->updateReplSetMonitor(remote.getTargetHost(), status); - - if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && !_stopRetrying && - remote.retryCount < kMaxNumFailedHostRetryAttempts) { - LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort - << " failed with retriable error and will be retried" << causedBy(redact(status)); - ++remote.retryCount; - - // Even if _done() is not true, signal the thread sleeping in waitForResponses() to make - // it schedule a retry for this remote without waiting for all outstanding requests to - // come back. - signaller.Dismiss(); - _signalCurrentNotification_inlock(); + // Store the response or error. + if (cbData.response.status.isOK()) { + remote.swResponse = std::move(cbData.response); } else { - // Non-retriable error, out of retries, or _stopRetrying is true. - - // Even though we examined the command status to check for retriable errors, we just return - // the response or response status here. It is up to the caller to parse the response as - // a command result. - if (cbData.response.status.isOK()) { - remote.swResponse = std::move(cbData.response); - } else { - remote.swResponse = std::move(cbData.response.status); - } - - // If the caller can't use partial results, there's no point continuing to retry on - // retriable errors for other remotes. - if (!_allowPartialResults) { - _stopRetrying = true; - } + remote.swResponse = std::move(cbData.response.status); } -} -void AsyncRequestsSender::_signalCurrentNotification_inlock() { - // Only signal the notification if it has not already been signalled. + // Signal the notification indicating that the remote received a response. if (!*_notification) { _notification->set(); } } -void AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock() { - if (_done_inlock()) { - _signalCurrentNotification_inlock(); - } -} - AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj) : shardId(shardId), cmdObj(cmdObj) {} -AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp) - : swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {} +AsyncRequestsSender::Response::Response(ShardId shardId, + executor::RemoteCommandResponse response, + HostAndPort hp) + : shardId(std::move(shardId)), + swResponse(std::move(response)), + shardHostAndPort(std::move(hp)) {} -AsyncRequestsSender::Response::Response(Status status, boost::optional<HostAndPort> hp) - : swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {} +AsyncRequestsSender::Response::Response(ShardId shardId, + Status status, + boost::optional<HostAndPort> hp) + : shardId(std::move(shardId)), swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {} AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} -const HostAndPort& AsyncRequestsSender::RemoteData::getTargetHost() const { - invariant(shardHostAndPort); - return *shardHostAndPort; -} - Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( const ReadPreferenceSetting& readPref) { const auto shard = getShard(); diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 9f8664f9c2a..ae90e8eaded 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -48,20 +48,35 @@ namespace mongo { /** * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and - * automatically retrying on retriable errors according to a RetryPolicy. It can also allow for - * retrieving partial results by ignoring shards that return errors. - * - * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop. + * automatically retrying on retriable errors according to a RetryPolicy. Work on remote nodes is + * accomplished by scheduling remote work in a TaskExecutor's event loop. * * Typical usage is: * - * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); // schedule the - * requests - * auto responses = ars.waitForResponses(opCtx); // wait for responses; retries on retriable erors + * // Add some requests + * std::vector<AsyncRequestSender::Request> requests; + * + * // Creating the ARS schedules the requests immediately + * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); + * + * while (!ars.done()) { + * // Schedule a round of retries if needed and wait for next response or error + * auto response = ars.next(opCtx); * - * Additionally, you can interrupt() (if you want waitForResponses() to wait for responses for - * outstanding requests but stop scheduling retries) or kill() (if you want to cancel outstanding - * requests) the ARS from another thread. + * if (!response.swResponse.isOK()) { + * // If partial results are tolerable, process the error as needed and continue. + * continue; + * + * // If partial results are not tolerable but you need to retrieve responses for all + * // dispatched requests, use stopRetrying() and continue. + * ars.stopRetrying(); + * continue; + * + * // If partial results are not tolerable and you don't care about dispatched requests, + * // safe to destroy the ARS. It will automatically cancel pending I/O and wait for the + * // outstanding callbacks to complete on destruction. + * } + * } * * Does not throw exceptions. */ @@ -87,10 +102,13 @@ public: */ struct Response { // Constructor for a response that was successfully received. - Response(executor::RemoteCommandResponse response, HostAndPort hp); + Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp); // Constructor that specifies the reason the response was not successfully received. - Response(Status status, boost::optional<HostAndPort> hp); + Response(ShardId shardId, Status status, boost::optional<HostAndPort> hp); + + // The shard to which the request was sent. + ShardId shardId; // The response or error from the remote. StatusWith<executor::RemoteCommandResponse> swResponse; @@ -101,33 +119,38 @@ public: }; /** - * Constructs a new AsyncRequestsSender. The TaskExecutor* must remain valid for the lifetime of - * the ARS. + * Constructs a new AsyncRequestsSender. The OperationContext* and TaskExecutor* must remain + * valid for the lifetime of the ARS. */ AsyncRequestsSender(OperationContext* opCtx, executor::TaskExecutor* executor, StringData db, const std::vector<AsyncRequestsSender::Request>& requests, - const ReadPreferenceSetting& readPreference, - bool allowPartialResults = false); + const ReadPreferenceSetting& readPreference); + /** + * Ensures pending network I/O for any outstanding requests has been canceled and waits for + * outstanding requests to complete. + */ ~AsyncRequestsSender(); /** - * Returns a vector containing the responses or errors for each remote in the same order as the - * input vector that was passed in the constructor. + * Returns true if responses for all requests have been returned via next(). + */ + bool done(); + + /** + * Returns the next available response or error. * - * If we were killed, returns immediately. - * If we were interrupted, returns when any outstanding requests have completed. - * Otherwise, returns when each remote has received a response or error. + * If neither kill() nor stopRetrying() have been called, schedules retries for any remotes that + * have had a retriable error and have not exhausted their retries. * - * Must only be called once. + * Invalid to call if done() is true. */ - std::vector<Response> waitForResponses(OperationContext* opCtx); + Response next(); /** - * Stops the ARS from retrying requests. Causes waitForResponses() to wait until any outstanding - * requests have received a response or error. + * Stops the ARS from retrying requests. * * Use this if you no longer care about getting success responses, but need to do cleanup based * on responses for requests that have already been dispatched. @@ -135,72 +158,15 @@ public: void interrupt(); /** - * Cancels all outstanding requests and makes waitForResponses() return immediately. + * Cancels all outstanding requests. * * Use this if you no longer care about getting success responses, and don't need to process - * responses for outstanding requests. + * responses for requests that have already been dispatched. */ void kill(); private: /** - * Returns true if each remote has received a response or error. (If kill() has been called, - * the error is the error assigned by the TaskExecutor when a callback is canceled). - */ - bool _done(); - - /** - * Executes the logic of _done(). - */ - bool _done_inlock(); - - /** - * Replaces _notification with a new notification. - * - * If _stopRetrying is false, for each remote that does not have a response or outstanding - * request, schedules work to send the command to the remote. - * - * Invalid to call if there is an existing Notification and it has not yet been signaled. - */ - void _scheduleRequestsIfNeeded(OperationContext* opCtx); - - /** - * Helper to schedule a command to a remote. - * - * The 'remoteIndex' gives the position of the remote node from which we are retrieving the - * batch in '_remotes'. - * - * Returns success if the command to retrieve the next batch was scheduled successfully. - */ - Status _scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex); - - /** - * The callback for a remote command. - * - * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore - * indicates which node the response came from and where the response should be buffered. - * - * On a retriable error, unless _stopRetrying is true, signals the notification so that the - * request can be immediately retried. - * - * On a non-retriable error, if allowPartialResults is false, sets _stopRetrying to true. - */ - void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - OperationContext* opCtx, - size_t remoteIndex); - - /** - * If the existing notification has not yet been signaled, signals it and marks it as signaled. - */ - void _signalCurrentNotification_inlock(); - - /** - * Wrapper around signalCurrentNotification_inlock(); only signals the notification if _done() - * is true. - */ - void _signalCurrentNotificationIfDone_inlock(); - - /** * We instantiate one of these per remote host. */ struct RemoteData { @@ -210,11 +176,6 @@ private: RemoteData(ShardId shardId, BSONObj cmdObj); /** - * Returns the resolved host and port on which the remote command was or will be run. - */ - const HostAndPort& getTargetHost() const; - - /** * Given a read preference, selects a host on which the command should be run. */ Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); @@ -225,7 +186,7 @@ private: std::shared_ptr<Shard> getShard(); // ShardId of the shard to which the command will be sent. - const ShardId shardId; + ShardId shardId; // The command object to send to the remote host. BSONObj cmdObj; @@ -243,18 +204,61 @@ private: // The callback handle to an outstanding request for this remote. executor::TaskExecutor::CallbackHandle cbHandle; + + // Whether this remote's result has been returned. + bool done = false; }; /** - * Used internally to determine if the ARS should attempt to retry any requests. Is set to true - * when: - * - interrupt() or kill() is called - * - allowPartialResults is false and some remote has a non-retriable error (or exhausts its - * retries for a retriable error). + * Replaces _notification with a new notification. + * + * If _stopRetrying is false, schedules retries for remotes that have had a retriable error. + * + * If any remote has successfully received a response, returns a Response for it. + * If any remote has an error response that can't be retried, returns a Response for it. + * Otherwise, returns boost::none. + */ + boost::optional<Response> _ready(); + + /** + * For each remote that had a response, checks if it had a retriable error, and clears its + * response if so. + * + * For each remote without a response or pending request, schedules the remote request. + * + * On failure to schedule a request, signals the notification. + */ + void _scheduleRequests_inlock(); + + /** + * Helper to schedule a command to a remote. + * + * The 'remoteIndex' gives the position of the remote node from which we are retrieving the + * batch in '_remotes'. + * + * Returns success if the command was scheduled successfully. */ + Status _scheduleRequest_inlock(size_t remoteIndex); + + /** + * The callback for a remote command. + * + * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore + * indicates which node the response came from and where the response should be buffered. + * + * Stores the response or error in the remote and signals the notification. + */ + void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, + size_t remoteIndex); + + // Used internally to determine if the ARS should attempt to retry any requests. Is set to true + // when stopRetrying() or kill() is called. bool _stopRetrying = false; // Not owned here. + OperationContext* _opCtx; + + // Not owned here. executor::TaskExecutor* _executor; // The metadata obj to pass along with the command remote. Used to indicate that the command is @@ -267,10 +271,6 @@ private: // The readPreference to use for all requests. ReadPreferenceSetting _readPreference; - // If set to true, allows for skipping over hosts that have non-retriable errors or exhaust - // their retries. - bool _allowPartialResults = false; - // Must be acquired before accessing any data members. // Must also be held when calling any of the '_inlock()' helper functions. stdx::mutex _mutex; @@ -281,6 +281,9 @@ private: // A notification that gets signaled when a remote has a retriable error or the last outstanding // response is received. boost::optional<Notification<void>> _notification; + + // Set to true when kill() is called so that it is only executed once. + bool _killed = false; }; } // namespace mongo diff --git a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp index 7a81082597f..733e318208b 100644 --- a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp @@ -111,7 +111,7 @@ Status enforceLegacyWriteConcern(OperationContext* opCtx, requests.emplace_back(swShard.getValue()->getId(), gleCmd); } - // Send the requests and wait to receive all the responses. + // Send the requests. const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); AsyncRequestsSender ars(opCtx, @@ -119,12 +119,14 @@ Status enforceLegacyWriteConcern(OperationContext* opCtx, dbName, requests, readPref); - auto responses = ars.waitForResponses(opCtx); - // Parse the responses. + // Receive the responses. vector<Status> failedStatuses; - for (const auto& response : responses) { + while (!ars.done()) { + // Block until a response is available. + auto response = ars.next(); + // Return immediately if we failed to contact a shard. if (!response.shardHostAndPort) { invariant(!response.swResponse.isOK()); diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 8b8a3f2e644..774f00a43c2 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -42,7 +42,6 @@ #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/chunk_manager_targeter.h" -#include "mongo/s/commands/dbclient_multi_command.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" @@ -309,8 +308,7 @@ void ClusterWriter::write(OperationContext* opCtx, return; } - DBClientMultiCommand dispatcher; - BatchWriteExec exec(&targeter, &dispatcher); + BatchWriteExec exec(&targeter); exec.executeBatch(opCtx, *request, response, &_stats); } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 7986195c6f5..07ee899b5a5 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -270,7 +270,7 @@ private: requests.emplace_back(shardStatus.getValue()->getId(), command); } - // Send the requests and wait to receive all the responses. + // Send the requests. const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); AsyncRequestsSender ars(opCtx, @@ -278,12 +278,14 @@ private: dbName, requests, readPref); - auto responses = ars.waitForResponses(opCtx); - // Parse the responses. + // Receive the responses. Status dispatchStatus = Status::OK(); - for (const auto& response : responses) { + while (!ars.done()) { + // Block until a response is available. + auto response = ars.next(); + if (!response.swResponse.isOK()) { dispatchStatus = std::move(response.swResponse.getStatus()); break; @@ -295,12 +297,7 @@ private: invariant(response.shardHostAndPort); result.target = ConnectionString(std::move(*response.shardHostAndPort)); - auto shardStatus = shardRegistry->getShard(opCtx, result.target.toString()); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - result.shardTargetId = shardStatus.getValue()->getId(); - + result.shardTargetId = std::move(response.shardId); result.result = std::move(response.swResponse.getValue().data); results->push_back(result); diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 59d3762925f..f912196e595 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -116,7 +116,10 @@ void ShardingTestFixture::setUp() { auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>(); netForPool->setEgressMetadataHook(stdx::make_unique<ShardingEgressMetadataHookForMongos>()); + auto _mockNetworkForPool = netForPool.get(); auto execForPool = makeThreadPoolTestExecutor(std::move(netForPool)); + _networkTestEnvForPool = + stdx::make_unique<NetworkTestEnv>(execForPool.get(), _mockNetworkForPool); std::vector<std::unique_ptr<executor::TaskExecutor>> executorsForPool; executorsForPool.emplace_back(std::move(execForPool)); @@ -265,6 +268,10 @@ void ShardingTestFixture::onFindWithMetadataCommand( _networkTestEnv->onFindWithMetadataCommand(func); } +void ShardingTestFixture::onCommandForPoolExecutor(NetworkTestEnv::OnCommandFunction func) { + _networkTestEnvForPool->onCommand(func); +} + void ShardingTestFixture::setupShards(const std::vector<ShardType>& shards) { auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); }); diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h index 8bd7ad1844a..050da470678 100644 --- a/src/mongo/s/sharding_test_fixture.h +++ b/src/mongo/s/sharding_test_fixture.h @@ -116,6 +116,12 @@ protected: executor::NetworkTestEnv::OnFindCommandWithMetadataFunction func); /** + * Same as the onCommand* variants, but expects the request to be placed on the arbitrary + * executor of the Grid's executorPool. + */ + void onCommandForPoolExecutor(executor::NetworkTestEnv::OnCommandFunction func); + + /** * Setup the shard registry to contain the given shards until the next reload. */ void setupShards(const std::vector<ShardType>& shards); @@ -214,9 +220,14 @@ private: RemoteCommandTargeterFactoryMock* _targeterFactory; RemoteCommandTargeterMock* _configTargeter; + // For the Grid's fixed executor. executor::NetworkInterfaceMock* _mockNetwork; executor::TaskExecutor* _executor; std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv; + + // For the Grid's arbitrary executor in its executorPool. + std::unique_ptr<executor::NetworkTestEnv> _networkTestEnvForPool; + DistLockManagerMock* _distLockManager = nullptr; ShardingCatalogClientImpl* _catalogClient = nullptr; }; diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 57f84edf800..99f71ddf85a 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -38,7 +38,8 @@ #include "mongo/bson/util/builder.h" #include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter.h" -#include "mongo/s/client/multi_command_dispatch.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batch_write_op.h" @@ -50,9 +51,9 @@ namespace mongo { using std::make_pair; using std::stringstream; using std::vector; +using std::map; -BatchWriteExec::BatchWriteExec(NSTargeter* targeter, MultiCommandDispatch* dispatcher) - : _targeter(targeter), _dispatcher(dispatcher) {} +BatchWriteExec::BatchWriteExec(NSTargeter* targeter) : _targeter(targeter) {} namespace { @@ -62,7 +63,7 @@ namespace { // // TODO: Unordered map? -typedef OwnedPointerMap<ConnectionString, TargetedWriteBatch> OwnedHostBatchMap; +typedef OwnedPointerMap<ShardId, TargetedWriteBatch> OwnedShardBatchMap; } static void buildErrorFrom(const Status& status, WriteErrorDetail* error) { @@ -90,7 +91,6 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, BatchWriteExecStats* stats) { LOG(4) << "starting execution of write batch of size " << static_cast<int>(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS(); - BatchWriteOp batchOp; batchOp.initClientRequest(&clientRequest); @@ -125,8 +125,8 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, // exactly when the metadata changed. // - OwnedPointerVector<TargetedWriteBatch> childBatchesOwned; - vector<TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> childBatchesOwned; + map<ShardId, TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableMap(); // If we've already had a targeting error, we've refreshed the metadata once and can // record target errors definitively. @@ -149,85 +149,31 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, size_t numToSend = childBatches.size(); while (numSent != numToSend) { // Collect batches out on the network, mapped by endpoint - OwnedHostBatchMap ownedPendingBatches; - OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap(); + OwnedShardBatchMap ownedPendingBatches; + OwnedShardBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap(); // - // Send side + // Construct the requests. // - // Get as many batches as we can at once - for (vector<TargetedWriteBatch*>::iterator it = childBatches.begin(); - it != childBatches.end(); - ++it) { - // - // Collect the info needed to dispatch our targeted batch - // - - TargetedWriteBatch* nextBatch = *it; - // If the batch is NULL, we sent it previously, so skip - if (nextBatch == NULL) - continue; - - // Figure out what host we need to dispatch our targeted batch - const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); - auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard( - opCtx, nextBatch->getEndpoint().shardName); - - bool resolvedHost = false; - ConnectionString shardHost; - if (!shardStatus.isOK()) { - Status status(std::move(shardStatus.getStatus())); + vector<AsyncRequestsSender::Request> requests; - // Record a resolve failure - // TODO: It may be necessary to refresh the cache if stale, or maybe just - // cancel and retarget the batch - WriteErrorDetail error; - buildErrorFrom(status, &error); - LOG(4) << "unable to send write batch to " << nextBatch->getEndpoint().shardName - << causedBy(status); - batchOp.noteBatchError(*nextBatch, error); - } else { - auto shard = shardStatus.getValue(); - - auto swHostAndPort = shard->getTargeter()->findHostNoWait(readPref); - if (!swHostAndPort.isOK()) { - - // Record a resolve failure - // TODO: It may be necessary to refresh the cache if stale, or maybe just - // cancel and retarget the batch - WriteErrorDetail error; - buildErrorFrom(swHostAndPort.getStatus(), &error); - LOG(4) << "unable to send write batch to " - << nextBatch->getEndpoint().shardName - << causedBy(swHostAndPort.getStatus()); - batchOp.noteBatchError(*nextBatch, error); - } else { - shardHost = ConnectionString(std::move(swHostAndPort.getValue())); - resolvedHost = true; - } - } + // Get as many batches as we can at once + for (auto it = childBatches.begin(); it != childBatches.end(); ++it) { - if (!resolvedHost) { - ++stats->numResolveErrors; + TargetedWriteBatch* nextBatch = it->second; - // We're done with this batch - // Clean up when we can't resolve a host - delete *it; - *it = NULL; - --numToSend; + // If the batch is NULL, we sent it previously, so skip + if (nextBatch == NULL) continue; - } - // If we already have a batch for this host, wait until the next time - OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find(shardHost); + // If we already have a batch for this shard, wait until the next time + ShardId targetShardId = nextBatch->getEndpoint().shardName; + OwnedShardBatchMap::MapType::iterator pendingIt = + pendingBatches.find(targetShardId); if (pendingIt != pendingBatches.end()) continue; - // - // We now have all the info needed to dispatch the batch - // - BatchedCommandRequest request(clientRequest.getBatchType()); batchOp.buildBatchRequest(*nextBatch, &request); @@ -236,48 +182,90 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, NamespaceString nss(request.getNS()); request.setNS(nss); - LOG(4) << "sending write batch to " << shardHost.toString() << ": " + LOG(4) << "sending write batch to " << targetShardId << ": " << redact(request.toString()); - _dispatcher->addCommand(shardHost, nss.db(), request.toBSON()); + requests.emplace_back(targetShardId, request.toBSON()); // Indicate we're done by setting the batch to NULL // We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast // endpoints for the same host, so this should be pretty efficient without // moving stuff around. - *it = NULL; + it->second = NULL; // Recv-side is responsible for cleaning up the nextBatch when used - pendingBatches.insert(make_pair(shardHost, nextBatch)); + pendingBatches.insert(make_pair(targetShardId, nextBatch)); } - // Send them all out - _dispatcher->sendAll(); + // + // Send the requests. + // + + const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); + AsyncRequestsSender ars(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + clientRequest.getTargetingNSS().db(), + requests, + readPref); numSent += pendingBatches.size(); // - // Recv side + // Receive the responses. // - while (_dispatcher->numPending() > 0) { - // Get the response - ConnectionString shardHost; - BatchedCommandResponse response; - Status dispatchStatus = _dispatcher->recvAny(&shardHost, &response); + while (!ars.done()) { + // Block until a response is available. + auto response = ars.next(); // Get the TargetedWriteBatch to find where to put the response - dassert(pendingBatches.find(shardHost) != pendingBatches.end()); - TargetedWriteBatch* batch = pendingBatches.find(shardHost)->second; + dassert(pendingBatches.find(response.shardId) != pendingBatches.end()); + TargetedWriteBatch* batch = pendingBatches.find(response.shardId)->second; + + // First check if we were able to target a shard host. + if (!response.shardHostAndPort) { + invariant(!response.swResponse.isOK()); + + // Record a resolve failure + // TODO: It may be necessary to refresh the cache if stale, or maybe just + // cancel and retarget the batch + LOG(4) << "unable to send write batch to " << batch->getEndpoint().shardName + << causedBy(response.swResponse.getStatus()); + WriteErrorDetail error; + buildErrorFrom(std::move(response.swResponse.getStatus()), &error); + batchOp.noteBatchError(*batch, error); + + // We're done with this batch + // Clean up when we can't resolve a host + auto it = childBatches.find(batch->getEndpoint().shardName); + invariant(it != childBatches.end()); + delete it->second; + it->second = NULL; + continue; + } + ConnectionString shardHost(std::move(*response.shardHostAndPort)); + + + // Then check if we successfully got a response. + Status status = response.swResponse.getStatus(); + BatchedCommandResponse batchedCommandResponse; + if (status.isOK()) { + std::string errMsg; + if (!batchedCommandResponse.parseBSON(response.swResponse.getValue().data, + &errMsg) || + !batchedCommandResponse.isValid(&errMsg)) { + status = {ErrorCodes::FailedToParse, errMsg}; + } + } - if (dispatchStatus.isOK()) { + if (status.isOK()) { TrackedErrors trackedErrors; trackedErrors.startTracking(ErrorCodes::StaleShardVersion); LOG(4) << "write results received from " << shardHost.toString() << ": " - << redact(response.toString()); + << redact(batchedCommandResponse.toString()); // Dispatch was ok, note response - batchOp.noteBatchResponse(*batch, response, &trackedErrors); + batchOp.noteBatchResponse(*batch, batchedCommandResponse, &trackedErrors); // Note if anything was stale const vector<ShardError*>& staleErrors = @@ -291,22 +279,25 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, // Remember that we successfully wrote to this shard // NOTE: This will record lastOps for shards where we actually didn't update // or delete any documents, which preserves old behavior but is conservative - stats->noteWriteAt( - shardHost, - response.isLastOpSet() ? response.getLastOp() : repl::OpTime(), - response.isElectionIdSet() ? response.getElectionId() : OID()); + stats->noteWriteAt(shardHost, + batchedCommandResponse.isLastOpSet() + ? batchedCommandResponse.getLastOp() + : repl::OpTime(), + batchedCommandResponse.isElectionIdSet() + ? batchedCommandResponse.getElectionId() + : OID()); } else { // Error occurred dispatching, note it stringstream msg; msg << "write results unavailable from " << shardHost.toString() - << causedBy(dispatchStatus.toString()); + << causedBy(status.toString()); WriteErrorDetail error; buildErrorFrom(Status(ErrorCodes::RemoteResultsUnavailable, msg.str()), &error); LOG(4) << "unable to receive write results from " << shardHost.toString() - << causedBy(redact(dispatchStatus.toString())); + << causedBy(redact(status.toString())); batchOp.noteBatchError(*batch, error); } diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index b430e3c5baf..51fd8633207 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -42,7 +42,6 @@ namespace mongo { class BatchWriteExecStats; -class MultiCommandDispatch; class OperationContext; /** @@ -64,7 +63,7 @@ class BatchWriteExec { MONGO_DISALLOW_COPYING(BatchWriteExec); public: - BatchWriteExec(NSTargeter* targeter, MultiCommandDispatch* dispatcher); + BatchWriteExec(NSTargeter* targeter); /** * Executes a client batch write request by sending child batches to several shard @@ -80,9 +79,6 @@ public: private: // Not owned here NSTargeter* _targeter; - - // Not owned here - MultiCommandDispatch* _dispatcher; }; struct HostOpTime { diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 30833b726eb..73cdd24a006 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -30,11 +30,9 @@ #include "mongo/s/write_ops/batch_write_exec.h" -#include "mongo/base/owned_pointer_vector.h" #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/s/catalog/type_shard.h" -#include "mongo/s/client/mock_multi_write_command.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_test_fixture.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -54,6 +52,7 @@ namespace { const HostAndPort kTestShardHost = HostAndPort("FakeHost", 12345); const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); const string shardName = "FakeShard"; +const int kMaxRoundsWithoutProgress = 5; /** * Mimics a single shard backend for a particular collection which can be initialized with a @@ -94,18 +93,81 @@ public: nsTargeter.init(mockRanges); // Make the batch write executor use the mock backend. - exec.reset(new BatchWriteExec(&nsTargeter, &dispatcher)); + exec.reset(new BatchWriteExec(&nsTargeter)); } - void setMockResults(const vector<MockWriteResult*>& results) { - dispatcher.init(results); + void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQUALS(nss.db(), request.dbname); + + BatchedInsertRequest actualBatchedInsert; + std::string errmsg; + ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); + + ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().toString()); + + auto inserted = actualBatchedInsert.getDocuments(); + ASSERT_EQUALS(expected.size(), inserted.size()); + + auto itInserted = inserted.begin(); + auto itExpected = expected.begin(); + + for (; itInserted != inserted.end(); itInserted++, itExpected++) { + ASSERT_BSONOBJ_EQ(*itExpected, *itInserted); + } + + BatchedCommandResponse response; + response.setOk(true); + + return response.toBSON(); + }); + } + + void expectInsertsReturnStaleVersionErrors(const std::vector<BSONObj>& expected) { + WriteErrorDetail error; + error.setErrCode(ErrorCodes::StaleShardVersion); + error.setErrMessage("mock stale error"); + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQUALS(nss.db(), request.dbname); + + BatchedInsertRequest actualBatchedInsert; + std::string errmsg; + ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); + + ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().toString()); + + auto inserted = actualBatchedInsert.getDocuments(); + ASSERT_EQUALS(expected.size(), inserted.size()); + + auto itInserted = inserted.begin(); + auto itExpected = expected.begin(); + + for (; itInserted != inserted.end(); itInserted++, itExpected++) { + ASSERT_BSONOBJ_EQ(*itExpected, *itInserted); + } + + BatchedCommandResponse staleResponse; + staleResponse.setOk(true); + staleResponse.setN(0); + + // Report a stale version error for each write in the batch. + int i = 0; + for (itInserted = inserted.begin(); itInserted != inserted.end(); ++itInserted) { + WriteErrorDetail* errorCopy = new WriteErrorDetail; + error.cloneTo(errorCopy); + errorCopy->setIndex(i); + staleResponse.addToErrDetails(errorCopy); + ++i; + } + + return staleResponse.toBSON(); + }); } ConnectionString shardHost{kTestShardHost}; NamespaceString nss{"foo.bar"}; MockNSTargeter nsTargeter; - MockMultiWriteCommand dispatcher; unique_ptr<BatchWriteExec> exec; }; @@ -124,14 +186,21 @@ TEST_F(BatchWriteExecTest, SingleOp) { request.setOrdered(false); request.setWriteConcern(BSONObj()); // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments(BSON("x" << 1)); + auto objToInsert = BSON("x" << 1); + request.getInsertRequest()->addToDocuments(objToInsert); - BatchedCommandResponse response; - BatchWriteExecStats stats; - exec->executeBatch(operationContext(), request, &response, &stats); - ASSERT(response.getOk()); + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + exec->executeBatch(operationContext(), request, &response, &stats); + ASSERT(response.getOk()); + ASSERT_EQUALS(stats.numRounds, 1); + }); - ASSERT_EQUALS(stats.numRounds, 1); + std::vector<BSONObj> expected{objToInsert}; + expectInsertsReturnSuccess(expected); + + future.timed_get(kFutureTimeout); } TEST_F(BatchWriteExecTest, SingleOpError) { @@ -139,33 +208,57 @@ TEST_F(BatchWriteExecTest, SingleOpError) { // Basic error test // - vector<MockWriteResult*> mockResults; BatchedCommandResponse errResponse; errResponse.setOk(false); errResponse.setErrCode(ErrorCodes::UnknownError); errResponse.setErrMessage("mock error"); - mockResults.push_back(new MockWriteResult(shardHost, errResponse)); - - setMockResults(mockResults); BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert); request.setNS(nss); request.setOrdered(false); request.setWriteConcern(BSONObj()); // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments(BSON("x" << 1)); - - BatchedCommandResponse response; - BatchWriteExecStats stats; - exec->executeBatch(operationContext(), request, &response, &stats); - ASSERT(response.getOk()); - ASSERT_EQUALS(response.getN(), 0); - ASSERT(response.isErrDetailsSet()); - ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), errResponse.getErrCode()); - ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) != - string::npos); - - ASSERT_EQUALS(stats.numRounds, 1); + auto objToInsert = BSON("x" << 1); + request.getInsertRequest()->addToDocuments(objToInsert); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + exec->executeBatch(operationContext(), request, &response, &stats); + ASSERT(response.getOk()); + ASSERT_EQUALS(response.getN(), 0); + ASSERT(response.isErrDetailsSet()); + ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), errResponse.getErrCode()); + ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) != + string::npos); + + ASSERT_EQUALS(stats.numRounds, 1); + }); + + std::vector<BSONObj> expected{objToInsert}; + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQUALS(nss.db(), request.dbname); + + BatchedInsertRequest actualBatchedInsert; + std::string errmsg; + ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); + + ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().toString()); + + auto inserted = actualBatchedInsert.getDocuments(); + ASSERT_EQUALS(expected.size(), inserted.size()); + + auto itInserted = inserted.begin(); + auto itExpected = expected.begin(); + + for (; itInserted != inserted.end(); itInserted++, itExpected++) { + ASSERT_BSONOBJ_EQ(*itExpected, *itInserted); + } + + return errResponse.toBSON(); + }); + + future.timed_get(kFutureTimeout); } // @@ -183,23 +276,24 @@ TEST_F(BatchWriteExecTest, StaleOp) { request.setOrdered(false); request.setWriteConcern(BSONObj()); // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments(BSON("x" << 1)); + auto objToInsert = BSON("x" << 1); + request.getInsertRequest()->addToDocuments(objToInsert); - vector<MockWriteResult*> mockResults; - WriteErrorDetail error; - error.setErrCode(ErrorCodes::StaleShardVersion); - error.setErrMessage("mock stale error"); - mockResults.push_back(new MockWriteResult(shardHost, error)); + // Execute request + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + exec->executeBatch(operationContext(), request, &response, &stats); + ASSERT(response.getOk()); - setMockResults(mockResults); + ASSERT_EQUALS(stats.numStaleBatches, 1); + }); - // Execute request - BatchedCommandResponse response; - BatchWriteExecStats stats; - exec->executeBatch(operationContext(), request, &response, &stats); - ASSERT(response.getOk()); + std::vector<BSONObj> expected{objToInsert}; + expectInsertsReturnStaleVersionErrors(expected); + expectInsertsReturnSuccess(expected); - ASSERT_EQUALS(stats.numStaleBatches, 1); + future.timed_get(kFutureTimeout); } TEST_F(BatchWriteExecTest, MultiStaleOp) { @@ -213,25 +307,28 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) { request.setOrdered(false); request.setWriteConcern(BSONObj()); // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments(BSON("x" << 1)); + auto objToInsert = BSON("x" << 1); + request.getInsertRequest()->addToDocuments(objToInsert); - vector<MockWriteResult*> mockResults; - WriteErrorDetail error; - error.setErrCode(ErrorCodes::StaleShardVersion); - error.setErrMessage("mock stale error"); + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + exec->executeBatch(operationContext(), request, &response, &stats); + ASSERT(response.getOk()); + + ASSERT_EQUALS(stats.numStaleBatches, 3); + }); + + std::vector<BSONObj> expected{objToInsert}; + + // Return multiple StaleShardVersion errors for (int i = 0; i < 3; i++) { - mockResults.push_back(new MockWriteResult(shardHost, error)); + expectInsertsReturnStaleVersionErrors(expected); } - setMockResults(mockResults); + expectInsertsReturnSuccess(expected); - // Execute request - BatchedCommandResponse response; - BatchWriteExecStats stats; - exec->executeBatch(operationContext(), request, &response, &stats); - ASSERT(response.getOk()); - - ASSERT_EQUALS(stats.numStaleBatches, 3); + future.timed_get(kFutureTimeout); } TEST_F(BatchWriteExecTest, TooManyStaleOp) { @@ -247,60 +344,32 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) { request.setOrdered(false); request.setWriteConcern(BSONObj()); // Do single-target, single doc batch write ops - request.getInsertRequest()->addToDocuments(BSON("x" << 1)); - request.getInsertRequest()->addToDocuments(BSON("x" << 2)); - - vector<MockWriteResult*> mockResults; - WriteErrorDetail error; - error.setErrCode(ErrorCodes::StaleShardVersion); - error.setErrMessage("mock stale error"); - for (int i = 0; i < 10; i++) { - mockResults.push_back(new MockWriteResult(shardHost, error, request.sizeWriteOps())); + auto objToInsert1 = BSON("x" << 1); + auto objToInsert2 = BSON("x" << 2); + request.getInsertRequest()->addToDocuments(objToInsert1); + request.getInsertRequest()->addToDocuments(objToInsert2); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + exec->executeBatch(operationContext(), request, &response, &stats); + ASSERT(response.getOk()); + ASSERT_EQUALS(response.getN(), 0); + ASSERT(response.isErrDetailsSet()); + ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade); + ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade); + + ASSERT_EQUALS(stats.numStaleBatches, (1 + kMaxRoundsWithoutProgress)); + }); + + std::vector<BSONObj> expected{objToInsert1, objToInsert2}; + + // Return multiple StaleShardVersion errors + for (int i = 0; i < (1 + kMaxRoundsWithoutProgress); i++) { + expectInsertsReturnStaleVersionErrors(expected); } - setMockResults(mockResults); - - // Execute request - BatchedCommandResponse response; - BatchWriteExecStats stats; - exec->executeBatch(operationContext(), request, &response, &stats); - ASSERT(response.getOk()); - ASSERT_EQUALS(response.getN(), 0); - ASSERT(response.isErrDetailsSet()); - ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade); - ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade); -} - -TEST_F(BatchWriteExecTest, ManyStaleOpWithMigration) { - // - // Retry op in exec many times b/c of stale config, but simulate remote migrations occurring - // - - // Insert request - BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert); - request.setNS(nss); - request.setOrdered(false); - request.setWriteConcern(BSONObj()); - // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments(BSON("x" << 1)); - - vector<MockWriteResult*> mockResults; - WriteErrorDetail error; - error.setErrCode(ErrorCodes::StaleShardVersion); - error.setErrMessage("mock stale error"); - for (int i = 0; i < 10; i++) { - mockResults.push_back(new MockWriteResult(shardHost, error)); - } - - setMockResults(mockResults); - - // Execute request - BatchedCommandResponse response; - BatchWriteExecStats stats; - exec->executeBatch(operationContext(), request, &response, &stats); - ASSERT(response.getOk()); - - ASSERT_EQUALS(stats.numStaleBatches, 6); + future.timed_get(kFutureTimeout); } } // namespace diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index c1e411cb2ed..be0b6a05286 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -238,7 +238,7 @@ static void cancelBatches(const WriteErrorDetail& why, Status BatchWriteOp::targetBatch(OperationContext* opCtx, const NSTargeter& targeter, bool recordTargetErrors, - vector<TargetedWriteBatch*>* targetedBatches) { + std::map<ShardId, TargetedWriteBatch*>* targetedBatches) { // // Targeting of unordered batches is fairly simple - each remaining write op is targeted, // and each of those targeted writes are grouped into a batch for a particular shard @@ -402,7 +402,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, // Remember targeted batch for reporting _targeted.insert(batch); // Send the handle back to caller - targetedBatches->push_back(batch); + invariant(targetedBatches->find(batch->getEndpoint().shardName) == targetedBatches->end()); + targetedBatches->insert(std::make_pair(batch->getEndpoint().shardName, batch)); } return Status::OK(); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 3bfbdf39954..5e37c87598c 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -110,7 +110,7 @@ public: Status targetBatch(OperationContext* opCtx, const NSTargeter& targeter, bool recordTargetErrors, - std::vector<TargetedWriteBatch*>* targetedBatches); + std::map<ShardId, TargetedWriteBatch*>* targetedBatches); /** * Fills a BatchCommandRequest from a TargetedWriteBatch for this BatchWriteOp. diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index e5bc9b5a5cd..b34c3a4a20b 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/base/owned_pointer_vector.h" +#include "mongo/base/owned_pointer_map.h" #include "mongo/db/operation_context_noop.h" #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -39,8 +39,9 @@ namespace mongo { -using std::unique_ptr; +using std::map; using std::string; +using std::unique_ptr; using std::vector; namespace { @@ -150,19 +151,19 @@ TEST(WriteOpTests, SingleOp) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); BatchedCommandResponse response; buildResponse(1, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -190,19 +191,19 @@ TEST(WriteOpTests, SingleError) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); BatchedCommandResponse response; buildErrResponse(ErrorCodes::UnknownError, "message", &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -236,8 +237,8 @@ TEST(WriteOpTests, SingleTargetError) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(!status.isOK()); @@ -279,17 +280,17 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); BatchedCommandRequest targetBatch(BatchedCommandRequest::BatchType_Insert); - batchOp.buildBatchRequest(*targeted.front(), &targetBatch); + batchOp.buildBatchRequest(*targeted.begin()->second, &targetBatch); ASSERT(targetBatch.getWriteConcern().woCompare(request.getWriteConcern()) == 0); BatchedCommandResponse response; @@ -297,7 +298,7 @@ TEST(WriteOpTests, SingleWriteConcernErrorOrdered) { addWCError(&response); // First stale response comes back, we should retry - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -327,8 +328,8 @@ TEST(WriteOpTests, SingleStaleError) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); BatchedCommandResponse response; @@ -336,14 +337,14 @@ TEST(WriteOpTests, SingleStaleError) { addError(ErrorCodes::StaleShardVersion, "mock stale error", 0, &response); // First stale response comes back, we should retry - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); // Respond again with a stale response - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -352,7 +353,7 @@ TEST(WriteOpTests, SingleStaleError) { buildResponse(1, &response); // Respond with an 'ok' response - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -366,16 +367,6 @@ TEST(WriteOpTests, SingleStaleError) { // Multi-operation batches // -struct EndpointComp { - bool operator()(const TargetedWriteBatch* writeA, const TargetedWriteBatch* writeB) const { - return writeA->getEndpoint().shardName.compare(writeB->getEndpoint().shardName) < 0; - } -}; - -inline void sortByEndpoint(vector<TargetedWriteBatch*>* writes) { - std::sort(writes->begin(), writes->end(), EndpointComp()); -} - TEST(WriteOpTests, MultiOpSameShardOrdered) { // // Multi-op targeting test (ordered) @@ -398,20 +389,20 @@ TEST(WriteOpTests, MultiOpSameShardOrdered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); BatchedCommandResponse response; buildResponse(2, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -442,20 +433,20 @@ TEST(WriteOpTests, MultiOpSameShardUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpoint); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); BatchedCommandResponse response; buildResponse(2, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -488,21 +479,21 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointA); BatchedCommandResponse response; buildResponse(1, &response); // Respond to first targeted batch - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -510,11 +501,11 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointB); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointB); // Respond to second targeted batch - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -523,6 +514,22 @@ TEST(WriteOpTests, MultiOpTwoShardsOrdered) { ASSERT_EQUALS(clientResponse.getN(), 2); } +void verifyTargetedBatches(map<ShardId, size_t> expected, + const map<ShardId, TargetedWriteBatch*>& targeted) { + // 'expected' contains each ShardId that was expected to be targeted and the size of the batch + // that was expected to be targeted to it. + // We check that each ShardId in 'targeted' corresponds to one in 'expected', in that it + // contains a batch of the correct size. + // Finally, we ensure that no additional ShardIds are present in 'targeted' than 'expected'. + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT_EQUALS(expected[it->second->getEndpoint().shardName], + it->second->getWrites().size()); + ASSERT_EQUALS(ChunkVersion::IGNORED(), it->second->getEndpoint().shardVersion); + expected.erase(expected.find(it->second->getEndpoint().shardName)); + } + ASSERT(expected.empty()); +} + TEST(WriteOpTests, MultiOpTwoShardsUnordered) { // // Multi-op, multi-endpoint targeting test (unordered) @@ -547,26 +554,23 @@ TEST(WriteOpTests, MultiOpTwoShardsUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); BatchedCommandResponse response; buildResponse(1, &response); // Respond to both targeted batches - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -601,26 +605,23 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); BatchedCommandResponse response; buildResponse(1, &response); // Respond to both targeted batches for first multi-delete - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -628,16 +629,13 @@ TEST(WriteOpTests, MultiOpTwoShardsEachOrdered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); // Respond to second targeted batches for second multi-delete - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -672,26 +670,23 @@ TEST(WriteOpTests, MultiOpTwoShardsEachUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 2u); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 2u}, {endpointB.shardName, 2u}}, targeted); BatchedCommandResponse response; buildResponse(2, &response); // Respond to both targeted batches, each containing two ops - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -734,22 +729,22 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointA); BatchedCommandResponse response; // Emulate one-write-per-delete-per-host buildResponse(2, &response); // Respond to first targeted batch containing the two single-host deletes - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -758,19 +753,16 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); // Emulate one-write-per-delete-per-host buildResponse(1, &response); // Respond to two targeted batches for first multi-delete - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -779,16 +771,13 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); // Respond to two targeted batches for second multi-delete - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -797,14 +786,14 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsOrdered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpointB); // Emulate one-write-per-delete-per-host buildResponse(2, &response); // Respond to final targeted batch containing the last two single-host deletes - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -846,27 +835,24 @@ TEST(WriteOpTests, MultiOpOneOrTwoShardsUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 4u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 4u); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); + verifyTargetedBatches({{endpointA.shardName, 4u}, {endpointB.shardName, 4u}}, targeted); BatchedCommandResponse response; // Emulate one-write-per-delete-per-host buildResponse(4, &response); // Respond to first targeted batch containing the two single-host deletes - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -899,32 +885,33 @@ TEST(WriteOpTests, MultiOpSingleShardErrorUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); BatchedCommandResponse response; buildResponse(1, &response); + // Respond to batches. + auto targetedIt = targeted.begin(); + // No error on first shard - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(!batchOp.isFinished()); buildResponse(0, &response); addError(ErrorCodes::UnknownError, "mock error", 0, &response); // Error on second write on second shard - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + ++targetedIt; + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(batchOp.isFinished()); + ASSERT(++targetedIt == targeted.end()); BatchedCommandResponse clientResponse; batchOp.buildClientResponse(&clientResponse); @@ -963,29 +950,24 @@ TEST(WriteOpTests, MultiOpTwoShardErrorsUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); BatchedCommandResponse response; buildResponse(0, &response); addError(ErrorCodes::UnknownError, "mock error", 0, &response); - // Error on first write on first shard - batchOp.noteBatchResponse(*targeted.front(), response, NULL); - ASSERT(!batchOp.isFinished()); - - // Error on second write on second shard - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + // Error on first write on first shard and second write on second shard. + for (auto it = targeted.begin(); it != targeted.end(); ++it) { + ASSERT(!batchOp.isFinished()); + batchOp.noteBatchResponse(*it->second, response, NULL); + } ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -1032,32 +1014,33 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorUnordered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 2u); + verifyTargetedBatches({{endpointA.shardName, 2u}, {endpointB.shardName, 2u}}, targeted); + + // Respond to batches. + auto targetedIt = targeted.begin(); BatchedCommandResponse response; buildResponse(2, &response); // No errors on first shard - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(!batchOp.isFinished()); buildResponse(1, &response); addError(ErrorCodes::UnknownError, "mock error", 1, &response); // Error on second write on second shard - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + ++targetedIt; + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(batchOp.isFinished()); + ASSERT(++targetedIt == targeted.end()); BatchedCommandResponse clientResponse; batchOp.buildClientResponse(&clientResponse); @@ -1099,32 +1082,33 @@ TEST(WriteOpTests, MultiOpPartialSingleShardErrorOrdered) { batchOp.initClientRequest(&request); ASSERT(!batchOp.isFinished()); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 2u); - sortByEndpoint(&targeted); - assertEndpointsEqual(targeted.front()->getEndpoint(), endpointA); - assertEndpointsEqual(targeted.back()->getEndpoint(), endpointB); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); - ASSERT_EQUALS(targeted.back()->getWrites().size(), 1u); + verifyTargetedBatches({{endpointA.shardName, 1u}, {endpointB.shardName, 1u}}, targeted); + + // Respond to batches. + auto targetedIt = targeted.begin(); BatchedCommandResponse response; buildResponse(1, &response); // No errors on first shard - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(!batchOp.isFinished()); buildResponse(0, &response); addError(ErrorCodes::UnknownError, "mock error", 0, &response); // Error on second write on second shard - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + ++targetedIt; + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(batchOp.isFinished()); + ASSERT(++targetedIt == targeted.end()); BatchedCommandResponse clientResponse; batchOp.buildClientResponse(&clientResponse); @@ -1167,8 +1151,8 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); BatchedCommandResponse response; @@ -1177,7 +1161,7 @@ TEST(WriteOpTests, MultiOpErrorAndWriteConcernErrorUnordered) { addWCError(&response); // First stale response comes back, we should retry - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); // Unordered reports write concern error @@ -1212,24 +1196,29 @@ TEST(WriteOpTests, SingleOpErrorAndWriteConcernErrorOrdered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); + // Respond to batches. + auto targetedIt = targeted.begin(); + BatchedCommandResponse response; buildResponse(1, &response); addWCError(&response); // First response comes back with write concern error - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(!batchOp.isFinished()); buildResponse(0, &response); addError(ErrorCodes::UnknownError, "mock error", 0, &response); // Second response comes back with write error - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + ++targetedIt; + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(batchOp.isFinished()); + ASSERT(++targetedIt == targeted.end()); // Ordered doesn't report write concern error BatchedCommandResponse clientResponse; @@ -1263,8 +1252,8 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); // First targeting round fails since we may be stale @@ -1278,13 +1267,13 @@ TEST(WriteOpTests, MultiOpFailedTargetOrdered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u); BatchedCommandResponse response; buildResponse(1, &response); // First response ok - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -1327,8 +1316,8 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); // First targeting round fails since we may be stale @@ -1342,13 +1331,13 @@ TEST(WriteOpTests, MultiOpFailedTargetUnordered) { ASSERT(status.isOK()); ASSERT(!batchOp.isFinished()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u); BatchedCommandResponse response; buildResponse(2, &response); // Response is ok for first and third write - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -1382,15 +1371,15 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); // First shard batch is ok - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); @@ -1399,7 +1388,7 @@ TEST(WriteOpTests, MultiOpFailedBatchOrdered) { buildErrResponse(ErrorCodes::UnknownError, "mock error", &response); // Second shard batch fails - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); // We should have recorded an error for the second write @@ -1436,22 +1425,27 @@ TEST(WriteOpTests, MultiOpFailedBatchUnordered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); + // Respond to batches. + auto targetedIt = targeted.begin(); + BatchedCommandResponse response; buildResponse(1, &response); // First shard batch is ok - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(!batchOp.isFinished()); buildErrResponse(ErrorCodes::UnknownError, "mock error", &response); // Second shard batch fails - batchOp.noteBatchResponse(*targeted.back(), response, NULL); + ++targetedIt; + batchOp.noteBatchResponse(*targetedIt->second, response, NULL); ASSERT(batchOp.isFinished()); + ASSERT(++targetedIt == targeted.end()); // We should have recorded an error for the second and third write BatchedCommandResponse clientResponse; @@ -1488,15 +1482,15 @@ TEST(WriteOpTests, MultiOpAbortOrdered) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); BatchedCommandResponse response; buildResponse(1, &response); // First shard batch is ok - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); WriteErrorDetail abortError; @@ -1579,8 +1573,8 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); BatchedCommandResponse response; @@ -1588,14 +1582,14 @@ TEST(WriteOpTests, MultiOpTwoWCErrors) { addWCError(&response); // First shard write write concern fails. - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); status = batchOp.targetBatch(&opCtx, targeter, true, &targeted); // Second shard write write concern fails. - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); BatchedCommandResponse clientResponse; @@ -1632,8 +1626,8 @@ TEST(WriteOpLimitTests, OneBigDoc) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); @@ -1641,7 +1635,7 @@ TEST(WriteOpLimitTests, OneBigDoc) { BatchedCommandResponse response; buildResponse(1, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); } @@ -1669,26 +1663,26 @@ TEST(WriteOpLimitTests, OneBigOneSmall) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u); BatchedCommandResponse response; buildResponse(1, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1u); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); } @@ -1714,26 +1708,26 @@ TEST(WriteOpLimitTests, TooManyOps) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 1000u); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 1000u); BatchedCommandResponse response; buildResponse(1, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); + ASSERT_EQUALS(targeted.begin()->second->getWrites().size(), 2u); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); } @@ -1778,34 +1772,34 @@ TEST(WriteOpLimitTests, UpdateOverheadIncluded) { BatchWriteOp batchOp; batchOp.initClientRequest(&request); - OwnedPointerVector<TargetedWriteBatch> targetedOwned; - vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); Status status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u); + ASSERT_LESS_THAN(targeted.begin()->second->getWrites().size(), 1000u); BatchedCommandRequest childRequest(BatchedCommandRequest::BatchType_Update); - batchOp.buildBatchRequest(*targeted.front(), &childRequest); + batchOp.buildBatchRequest(*targeted.begin()->second, &childRequest); ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize); BatchedCommandResponse response; buildResponse(1, &response); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(!batchOp.isFinished()); targetedOwned.clear(); status = batchOp.targetBatch(&opCtx, targeter, false, &targeted); ASSERT(status.isOK()); ASSERT_EQUALS(targeted.size(), 1u); - ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u); + ASSERT_LESS_THAN(targeted.begin()->second->getWrites().size(), 1000u); childRequest.clear(); - batchOp.buildBatchRequest(*targeted.front(), &childRequest); + batchOp.buildBatchRequest(*targeted.begin()->second, &childRequest); ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize); - batchOp.noteBatchResponse(*targeted.front(), response, NULL); + batchOp.noteBatchResponse(*targeted.begin()->second, response, NULL); ASSERT(batchOp.isFinished()); } |