diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-27 16:01:24 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-29 08:55:38 -0400 |
commit | 2e8e60bfef83ac9c7bf494b0f80977686cb1b772 (patch) | |
tree | 32bc8ddb9dcdbc8738db1aa2348c17253f6a11f1 /src/mongo/s/write_ops/batch_write_op.cpp | |
parent | ce76085e2464c38119b307851d4c63687d9a581f (diff) | |
download | mongo-2e8e60bfef83ac9c7bf494b0f80977686cb1b772.tar.gz |
SERVER-28992 Make BatchWriteOp contain a vector of WriteOp
... instead of a dynamically allocated array and in-place
construction/destruction.
Diffstat (limited to 'src/mongo/s/write_ops/batch_write_op.cpp')
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 158 |
1 files changed, 64 insertions, 94 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 879a3fab200..13d088c57aa 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -30,6 +30,8 @@ #include "mongo/s/write_ops/batch_write_op.h" +#include <numeric> + #include "mongo/base/error_codes.h" #include "mongo/stdx/memory.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" @@ -44,32 +46,11 @@ using std::vector; namespace { -/** - * Compares endpoints in a map. - */ -struct EndpointComp { - bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const { - const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName); - if (shardNameDiff) { - return shardNameDiff < 0; - } - - const long shardVersionDiff = - endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); - if (shardVersionDiff) { - return shardVersionDiff < 0; - } - - return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0; - } -}; - struct BatchSize { int numOps{0}; int sizeBytes{0}; }; -typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap; typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap; struct WriteErrorDetailComp { @@ -190,36 +171,6 @@ int getWriteSizeBytes(const WriteOp& writeOp) { } } -/** - * Helper function to cancel all the write ops of targeted batches in a map - */ -void cancelBatches(const WriteErrorDetail& why, WriteOp* writeOps, TargetedBatchMap* batchMap) { - set<WriteOp*> targetedWriteOps; - - // Collect all the writeOps that are currently targeted - for (TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end();) { - TargetedWriteBatch* batch = it->second; - const vector<TargetedWrite*>& writes = batch->getWrites(); - - for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin(); - writeIt != writes.end(); - ++writeIt) { - TargetedWrite* write = *writeIt; - - // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to - // cancel before erasing the TargetedWrite* (which owns the cancelled targeting - // info) for reporting reasons. - writeOps[write->writeOpRef.first].cancelWrites(&why); - } - - // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from - // the values - batchMap->erase(it++); - delete batch; - } - batchMap->clear(); -} - void cloneCommandErrorTo(const BatchedCommandResponse& batchResp, WriteErrorDetail* details) { details->setErrCode(batchResp.getErrCode()); details->setErrMessage(batchResp.getErrMessage()); @@ -263,32 +214,17 @@ void trackErrors(const ShardEndpoint& endpoint, } // namespace BatchWriteOp::BatchWriteOp(const BatchedCommandRequest& clientRequest) - : _clientRequest(clientRequest), _writeOps(NULL) { - dassert(_clientRequest.isValid(NULL)); + : _clientRequest(clientRequest) { + _writeOps.reserve(_clientRequest.sizeWriteOps()); - size_t numWriteOps = _clientRequest.sizeWriteOps(); - _writeOps = static_cast<WriteOp*>(::operator new[](numWriteOps * sizeof(WriteOp))); - - for (size_t i = 0; i < numWriteOps; ++i) { - // Don't want to have to define what an empty WriteOp means, so construct in-place - new (&_writeOps[i]) WriteOp(BatchItemRef(&_clientRequest, i)); + for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) { + _writeOps.emplace_back(BatchItemRef(&_clientRequest, i)); } } BatchWriteOp::~BatchWriteOp() { // Caller's responsibility to dispose of TargetedBatches - dassert(_targeted.empty()); - - if (NULL != _writeOps) { - size_t numWriteOps = _clientRequest.sizeWriteOps(); - for (size_t i = 0; i < numWriteOps; ++i) { - // Placement new so manual destruct - _writeOps[i].~WriteOp(); - } - - ::operator delete[](_writeOps); - _writeOps = NULL; - } + invariant(_targeted.empty()); } Status BatchWriteOp::targetBatch(OperationContext* opCtx, @@ -335,7 +271,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, int numTargetErrors = 0; - size_t numWriteOps = _clientRequest.sizeWriteOps(); + const size_t numWriteOps = _clientRequest.sizeWriteOps(); + for (size_t i = 0; i < numWriteOps; ++i) { WriteOp& writeOp = _writeOps[i]; @@ -359,9 +296,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, if (!recordTargetErrors) { // Cancel current batch state with an error - - cancelBatches(targetError, _writeOps, &batchMap); - dassert(batchMap.empty()); + _cancelBatches(targetError, std::move(batchMap)); return targetStatus; } else if (!ordered || batchMap.empty()) { // Record an error for this batch @@ -376,9 +311,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, } else { dassert(ordered && !batchMap.empty()); - // Send out what we have, but don't record an error yet, since there may be an - // error in the writes before this point. - + // Send out what we have, but don't record an error yet, since there may be an error + // in the writes before this point. writeOp.cancelWrites(&targetError); break; } @@ -457,6 +391,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx, // Remember targeted batch for reporting _targeted.insert(batch); + // Send the handle back to caller invariant(targetedBatches->find(batch->getEndpoint().shardName) == targetedBatches->end()); targetedBatches->insert(std::make_pair(batch->getEndpoint().shardName, batch)); @@ -540,7 +475,7 @@ void BatchWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch, _targeted.erase(&targetedBatch); // Increment stats for this batch - _incBatchStats(_clientRequest.getBatchType(), response); + _incBatchStats(response); // // Assign errors to particular items. @@ -770,25 +705,17 @@ void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) { dassert(batchResp->isValid(NULL)); } -int BatchWriteOp::numWriteOps() const { - return static_cast<int>(_clientRequest.sizeWriteOps()); -} - int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const { // TODO: This could be faster, if we tracked this info explicitly - size_t numWriteOps = _clientRequest.sizeWriteOps(); - int count = 0; - for (size_t i = 0; i < numWriteOps; ++i) { - WriteOp& writeOp = _writeOps[i]; - if (writeOp.getWriteState() == opState) - ++count; - } - - return count; + return std::accumulate( + _writeOps.begin(), _writeOps.end(), 0, [opState](int sum, const WriteOp& writeOp) { + return sum + (writeOp.getWriteState() == opState ? 1 : 0); + }); } -void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType, - const BatchedCommandResponse& response) { +void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) { + const auto batchType = _clientRequest.getBatchType(); + if (batchType == BatchedCommandRequest::BatchType_Insert) { _numInserted += response.getN(); } else if (batchType == BatchedCommandRequest::BatchType_Update) { @@ -811,6 +738,49 @@ void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType, } } +void BatchWriteOp::_cancelBatches(const WriteErrorDetail& why, + TargetedBatchMap&& batchMapToCancel) { + TargetedBatchMap batchMap(batchMapToCancel); + + // Collect all the writeOps that are currently targeted + for (TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end();) { + TargetedWriteBatch* batch = it->second; + const vector<TargetedWrite*>& writes = batch->getWrites(); + + for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin(); + writeIt != writes.end(); + ++writeIt) { + TargetedWrite* write = *writeIt; + + // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to cancel + // before erasing the TargetedWrite* (which owns the cancelled targeting info) for + // reporting reasons. + _writeOps[write->writeOpRef.first].cancelWrites(&why); + } + + // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from + // the values + batchMap.erase(it++); + delete batch; + } +} + +bool EndpointComp::operator()(const ShardEndpoint* endpointA, + const ShardEndpoint* endpointB) const { + const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName); + if (shardNameDiff) { + return shardNameDiff < 0; + } + + const long shardVersionDiff = + endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); + if (shardVersionDiff) { + return shardVersionDiff < 0; + } + + return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0; +} + void TrackedErrors::startTracking(int errCode) { dassert(!isTracking(errCode)); _errorMap.insert(make_pair(errCode, vector<ShardError*>())); |