diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-27 16:01:24 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-29 08:55:38 -0400 |
commit | 2e8e60bfef83ac9c7bf494b0f80977686cb1b772 (patch) | |
tree | 32bc8ddb9dcdbc8738db1aa2348c17253f6a11f1 /src/mongo/s | |
parent | ce76085e2464c38119b307851d4c63687d9a581f (diff) | |
download | mongo-2e8e60bfef83ac9c7bf494b0f80977686cb1b772.tar.gz |
SERVER-28992 Make BatchWriteOp contain a vector of WriteOp
... instead of a dynamically allocated array and in-place
construction/destruction.
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 158 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 29 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.cpp | 80 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.h | 23 |
4 files changed, 125 insertions, 165 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 879a3fab200..13d088c57aa 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -30,6 +30,8 @@ #include "mongo/s/write_ops/batch_write_op.h" +#include <numeric> + #include "mongo/base/error_codes.h" #include "mongo/stdx/memory.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" @@ -44,32 +46,11 @@ using std::vector; namespace { -/** - * Compares endpoints in a map. - */ -struct EndpointComp { - bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const { - const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName); - if (shardNameDiff) { - return shardNameDiff < 0; - } - - const long shardVersionDiff = - endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); - if (shardVersionDiff) { - return shardVersionDiff < 0; - } - - return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0; - } -}; - struct BatchSize { int numOps{0}; int sizeBytes{0}; }; -typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap; typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap; struct WriteErrorDetailComp { @@ -190,36 +171,6 @@ int getWriteSizeBytes(const WriteOp& writeOp) { } } -/** - * Helper function to cancel all the write ops of targeted batches in a map - */ -void cancelBatches(const WriteErrorDetail& why, WriteOp* writeOps, TargetedBatchMap* batchMap) { - set<WriteOp*> targetedWriteOps; - - // Collect all the writeOps that are currently targeted - for (TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end();) { - TargetedWriteBatch* batch = it->second; - const vector<TargetedWrite*>& writes = batch->getWrites(); - - for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin(); - writeIt != writes.end(); - ++writeIt) { - TargetedWrite* write = *writeIt; - - // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to - // cancel before erasing the TargetedWrite* (which owns the cancelled targeting - // info) for reporting reasons. - writeOps[write->writeOpRef.first].cancelWrites(&why); - } - - // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from - // the values - batchMap->erase(it++); - delete batch; - } - batchMap->clear(); -} - void cloneCommandErrorTo(const BatchedCommandResponse& batchResp, WriteErrorDetail* details) { details->setErrCode(batchResp.getErrCode()); details->setErrMessage(batchResp.getErrMessage()); @@ -263,32 +214,17 @@ void trackErrors(const ShardEndpoint& endpoint, } // namespace BatchWriteOp::BatchWriteOp(const BatchedCommandRequest& clientRequest) - : _clientRequest(clientRequest), _writeOps(NULL) { - dassert(_clientRequest.isValid(NULL)); + : _clientRequest(clientRequest) { + _writeOps.reserve(_clientRequest.sizeWriteOps()); - size_t numWriteOps = _clientRequest.sizeWriteOps(); - _writeOps = static_cast<WriteOp*>(::operator new[](numWriteOps * sizeof(WriteOp))); - - for (size_t i = 0; i < numWriteOps; ++i) { - // Don't want to have to define what an empty WriteOp means, so construct in-place - new (&_writeOps[i]) WriteOp(BatchItemRef(&_clientRequest, i)); + for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) { + _writeOps.emplace_back(BatchItemRef(&_clientRequest, i)); } } BatchWriteOp::~BatchWriteOp() { // Caller's responsibility to dispose of TargetedBatches - dassert(_targeted.empty()); - - if (NULL != _writeOps) { - size_t numWriteOps = _clientRequest.sizeWriteOps(); - for (size_t i = 0; i < numWriteOps; ++i) { - // Placement new so manual destruct - _writeOps[i].~WriteOp(); - } - - ::operator delete[](_writeOps); - _writeOps = NULL; - } + invariant(_targeted.empty()); } Status BatchWriteOp::targetBatch(OperationContext* opCtx, @@ -335,7 +271,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, int numTargetErrors = 0; - size_t numWriteOps = _clientRequest.sizeWriteOps(); + const size_t numWriteOps = _clientRequest.sizeWriteOps(); + for (size_t i = 0; i < numWriteOps; ++i) { WriteOp& writeOp = _writeOps[i]; @@ -359,9 +296,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, if (!recordTargetErrors) { // Cancel current batch state with an error - - cancelBatches(targetError, _writeOps, &batchMap); - dassert(batchMap.empty()); + _cancelBatches(targetError, std::move(batchMap)); return targetStatus; } else if (!ordered || batchMap.empty()) { // Record an error for this batch @@ -376,9 +311,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, } else { dassert(ordered && !batchMap.empty()); - // Send out what we have, but don't record an error yet, since there may be an - // error in the writes before this point. - + // Send out what we have, but don't record an error yet, since there may be an error + // in the writes before this point. writeOp.cancelWrites(&targetError); break; } @@ -457,6 +391,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, // Remember targeted batch for reporting _targeted.insert(batch); + // Send the handle back to caller invariant(targetedBatches->find(batch->getEndpoint().shardName) == targetedBatches->end()); targetedBatches->insert(std::make_pair(batch->getEndpoint().shardName, batch)); @@ -540,7 +475,7 @@ void BatchWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch, _targeted.erase(&targetedBatch); // Increment stats for this batch - _incBatchStats(_clientRequest.getBatchType(), response); + _incBatchStats(response); // // Assign errors to particular items. @@ -770,25 +705,17 @@ void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) { dassert(batchResp->isValid(NULL)); } -int BatchWriteOp::numWriteOps() const { - return static_cast<int>(_clientRequest.sizeWriteOps()); -} - int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const { // TODO: This could be faster, if we tracked this info explicitly - size_t numWriteOps = _clientRequest.sizeWriteOps(); - int count = 0; - for (size_t i = 0; i < numWriteOps; ++i) { - WriteOp& writeOp = _writeOps[i]; - if (writeOp.getWriteState() == opState) - ++count; - } - - return count; + return std::accumulate( + _writeOps.begin(), _writeOps.end(), 0, [opState](int sum, const WriteOp& writeOp) { + return sum + (writeOp.getWriteState() == opState ? 1 : 0); + }); } -void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType, - const BatchedCommandResponse& response) { +void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) { + const auto batchType = _clientRequest.getBatchType(); + if (batchType == BatchedCommandRequest::BatchType_Insert) { _numInserted += response.getN(); } else if (batchType == BatchedCommandRequest::BatchType_Update) { @@ -811,6 +738,49 @@ void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType, } } +void BatchWriteOp::_cancelBatches(const WriteErrorDetail& why, + TargetedBatchMap&& batchMapToCancel) { + TargetedBatchMap batchMap(batchMapToCancel); + + // Collect all the writeOps that are currently targeted + for (TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end();) { + TargetedWriteBatch* batch = it->second; + const vector<TargetedWrite*>& writes = batch->getWrites(); + + for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin(); + writeIt != writes.end(); + ++writeIt) { + TargetedWrite* write = *writeIt; + + // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to cancel + // before erasing the TargetedWrite* (which owns the cancelled targeting info) for + // reporting reasons. + _writeOps[write->writeOpRef.first].cancelWrites(&why); + } + + // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from + // the values + batchMap.erase(it++); + delete batch; + } +} + +bool EndpointComp::operator()(const ShardEndpoint* endpointA, + const ShardEndpoint* endpointB) const { + const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName); + if (shardNameDiff) { + return shardNameDiff < 0; + } + + const long shardVersionDiff = + endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); + if (shardVersionDiff) { + return shardVersionDiff < 0; + } + + return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0; +} + void TrackedErrors::startTracking(int errCode) { dassert(!isTracking(errCode)); _errorMap.insert(make_pair(errCode, vector<ShardError*>())); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 6230fdee1f0..1ac8d8345de 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -28,7 +28,6 @@ #pragma once -#include <memory> #include <set> #include <vector> @@ -52,6 +51,15 @@ struct ShardWCError; class TrackedErrors; /** + * Compares endpoints in a map. + */ +struct EndpointComp { + bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const; +}; + +using TargetedBatchMap = std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp>; + +/** * The BatchWriteOp class manages the lifecycle of a batched write received by mongos. Each * item in a batch is tracked via a WriteOp, and the function of the BatchWriteOp is to * aggregate the dispatched requests and responses for the underlying WriteOps. @@ -147,19 +155,28 @@ public: */ void buildClientResponse(BatchedCommandResponse* batchResp); - int numWriteOps() const; - + /** + * Returns the number of write operations which are in the specified state. Runs in O(number of + * write operations). + */ int numWriteOpsIn(WriteOpState state) const; private: - void _incBatchStats(BatchedCommandRequest::BatchType batchType, - const BatchedCommandResponse& response); + /** + * Maintains the batch execution statistics when a response is received. + */ + void _incBatchStats(const BatchedCommandResponse& response); + + /** + * Helper function to cancel all the write ops of targeted batches in a map. + */ + void _cancelBatches(const WriteErrorDetail& why, TargetedBatchMap&& batchMapToCancel); // The incoming client request const BatchedCommandRequest& _clientRequest; // Array of ops being processed from the client request - WriteOp* _writeOps; + std::vector<WriteOp> _writeOps; // Current outstanding batch op write requests // Not owned here but tracked for reporting diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 919118bed4e..aafe20ecd29 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -30,8 +30,6 @@ #include "mongo/s/write_ops/write_op.h" -#include "mongo/base/error_codes.h" -#include "mongo/base/owned_pointer_vector.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -39,19 +37,6 @@ namespace mongo { using std::stringstream; using std::vector; -static void clear(vector<ChildWriteOp*>* childOps) { - for (vector<ChildWriteOp*>::const_iterator it = childOps->begin(); it != childOps->end(); - ++it) { - delete *it; - } - childOps->clear(); -} - -WriteOp::~WriteOp() { - clear(&_childOps); - clear(&_history); -} - const BatchItemRef& WriteOp::getWriteItem() const { return _itemRef; } @@ -118,7 +103,7 @@ Status WriteOp::targetWrites(OperationContext* opCtx, for (auto it = endpoints.begin(); it != endpoints.end(); ++it) { ShardEndpoint* endpoint = it->get(); - _childOps.push_back(new ChildWriteOp(this)); + _childOps.emplace_back(this); WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1); @@ -130,8 +115,8 @@ Status WriteOp::targetWrites(OperationContext* opCtx, targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref)); } - _childOps.back()->pendingWrite = targetedWrites->back(); - _childOps.back()->state = WriteOpState_Pending; + _childOps.back().pendingWrite = targetedWrites->back(); + _childOps.back().state = WriteOpState_Pending; } _state = WriteOpState_Pending; @@ -147,7 +132,7 @@ static bool isRetryErrCode(int errCode) { } // Aggregate a bunch of errors for a single op together -static void combineOpErrors(const vector<ChildWriteOp*>& errOps, WriteErrorDetail* error) { +static void combineOpErrors(const vector<ChildWriteOp const*>& errOps, WriteErrorDetail* error) { // Special case single response if (errOps.size() == 1) { errOps.front()->error->cloneTo(error); @@ -161,7 +146,8 @@ static void combineOpErrors(const vector<ChildWriteOp*>& errOps, WriteErrorDetai msg << "multiple errors for op : "; BSONArrayBuilder errB; - for (vector<ChildWriteOp*>::const_iterator it = errOps.begin(); it != errOps.end(); ++it) { + for (vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end(); + ++it) { const ChildWriteOp* errOp = *it; if (it != errOps.begin()) msg << " :: and :: "; @@ -178,29 +164,29 @@ static void combineOpErrors(const vector<ChildWriteOp*>& errOps, WriteErrorDetai * This is the core function which aggregates all the results of a write operation on multiple * shards and updates the write operation's state. */ -void WriteOp::updateOpState() { - vector<ChildWriteOp*> childErrors; +void WriteOp::_updateOpState() { + std::vector<ChildWriteOp const*> childErrors; bool isRetryError = true; - for (vector<ChildWriteOp*>::iterator it = _childOps.begin(); it != _childOps.end(); it++) { - ChildWriteOp* childOp = *it; - + for (const auto& childOp : _childOps) { // Don't do anything till we have all the info - if (childOp->state != WriteOpState_Completed && childOp->state != WriteOpState_Error) { + if (childOp.state != WriteOpState_Completed && childOp.state != WriteOpState_Error) { return; } - if (childOp->state == WriteOpState_Error) { - childErrors.push_back(childOp); + if (childOp.state == WriteOpState_Error) { + childErrors.push_back(&childOp); + // Any non-retry error aborts all - if (!isRetryErrCode(childOp->error->getErrCode())) + if (!isRetryErrCode(childOp.error->getErrCode())) { isRetryError = false; + } } } if (!childErrors.empty() && isRetryError) { // Since we're using broadcast mode for multi-shard writes, which cannot SCE - dassert(childErrors.size() == 1u); + invariant(childErrors.size() == 1u); _state = WriteOpState_Ready; } else if (!childErrors.empty()) { _error.reset(new WriteErrorDetail); @@ -210,48 +196,42 @@ void WriteOp::updateOpState() { _state = WriteOpState_Completed; } - // Now that we're done with the child ops, do something with them - // TODO: Don't store unlimited history? - dassert(_state != WriteOpState_Pending); - _history.insert(_history.end(), _childOps.begin(), _childOps.end()); + invariant(_state != WriteOpState_Pending); _childOps.clear(); } void WriteOp::cancelWrites(const WriteErrorDetail* why) { - dassert(_state == WriteOpState_Pending || _state == WriteOpState_Ready); - for (vector<ChildWriteOp*>::iterator it = _childOps.begin(); it != _childOps.end(); ++it) { - ChildWriteOp* childOp = *it; + invariant(_state == WriteOpState_Pending || _state == WriteOpState_Ready); - if (childOp->state == WriteOpState_Pending) { - childOp->endpoint.reset(new ShardEndpoint(childOp->pendingWrite->endpoint)); + for (auto& childOp : _childOps) { + if (childOp.state == WriteOpState_Pending) { + childOp.endpoint.reset(new ShardEndpoint(childOp.pendingWrite->endpoint)); if (why) { - childOp->error.reset(new WriteErrorDetail); - why->cloneTo(childOp->error.get()); + childOp.error.reset(new WriteErrorDetail); + why->cloneTo(childOp.error.get()); } - childOp->state = WriteOpState_Cancelled; + + childOp.state = WriteOpState_Cancelled; } } - _history.insert(_history.end(), _childOps.begin(), _childOps.end()); - _childOps.clear(); - _state = WriteOpState_Ready; + _childOps.clear(); } void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) { const WriteOpRef& ref = targetedWrite.writeOpRef; - dassert(static_cast<size_t>(ref.second) < _childOps.size()); - ChildWriteOp& childOp = *_childOps.at(ref.second); + auto& childOp = _childOps[ref.second]; childOp.pendingWrite = NULL; childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint)); childOp.state = WriteOpState_Completed; - updateOpState(); + _updateOpState(); } void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErrorDetail& error) { const WriteOpRef& ref = targetedWrite.writeOpRef; - ChildWriteOp& childOp = *_childOps.at(ref.second); + auto& childOp = _childOps[ref.second]; childOp.pendingWrite = NULL; childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint)); @@ -260,7 +240,7 @@ void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErro dassert(ref.first == _itemRef.getItemIndex()); childOp.error->setIndex(_itemRef.getItemIndex()); childOp.state = WriteOpState_Error; - updateOpState(); + _updateOpState(); } void WriteOp::setOpError(const WriteErrorDetail& error) { diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h index 00dd9abe7a9..759652f98e3 100644 --- a/src/mongo/s/write_ops/write_op.h +++ b/src/mongo/s/write_ops/write_op.h @@ -30,8 +30,6 @@ #include <vector> -#include "mongo/base/string_data.h" -#include "mongo/bson/bsonobj.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/write_error_detail.h" @@ -90,9 +88,7 @@ enum WriteOpState { */ class WriteOp { public: - WriteOp(const BatchItemRef& itemRef) : _itemRef(itemRef), _state(WriteOpState_Ready) {} - - ~WriteOp(); + WriteOp(BatchItemRef itemRef) : _itemRef(std::move(itemRef)) {} /** * Returns the write item for this operation @@ -166,22 +162,19 @@ private: /** * Updates the op state after new information is received. */ - void updateOpState(); + void _updateOpState(); // Owned elsewhere, reference to a batch with a write item const BatchItemRef _itemRef; // What stage of the operation we are at - WriteOpState _state; + WriteOpState _state{WriteOpState_Ready}; // filled when state == _Pending - std::vector<ChildWriteOp*> _childOps; + std::vector<ChildWriteOp> _childOps; // filled when state == _Error std::unique_ptr<WriteErrorDetail> _error; - - // Finished child operations, for debugging - std::vector<ChildWriteOp*> _history; }; /** @@ -192,15 +185,15 @@ private: * (_Error) state. */ struct ChildWriteOp { - ChildWriteOp(WriteOp* const parent) - : parentOp(parent), state(WriteOpState_Ready), pendingWrite(NULL) {} + ChildWriteOp(WriteOp* const parent) : parentOp(parent) {} const WriteOp* const parentOp; - WriteOpState state; + + WriteOpState state{WriteOpState_Ready}; // non-zero when state == _Pending // Not owned here but tracked for reporting - TargetedWrite* pendingWrite; + TargetedWrite* pendingWrite{nullptr}; // filled when state > _Pending std::unique_ptr<ShardEndpoint> endpoint; |