summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/batch_write_op.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-04-27 16:01:24 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-04-29 08:55:38 -0400
commit2e8e60bfef83ac9c7bf494b0f80977686cb1b772 (patch)
tree32bc8ddb9dcdbc8738db1aa2348c17253f6a11f1 /src/mongo/s/write_ops/batch_write_op.cpp
parentce76085e2464c38119b307851d4c63687d9a581f (diff)
downloadmongo-2e8e60bfef83ac9c7bf494b0f80977686cb1b772.tar.gz
SERVER-28992 Make BatchWriteOp contain a vector of WriteOp
... instead of a dynamically allocated array and in-place construction/destruction.
Diffstat (limited to 'src/mongo/s/write_ops/batch_write_op.cpp')
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp158
1 files changed, 64 insertions, 94 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 879a3fab200..13d088c57aa 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -30,6 +30,8 @@
#include "mongo/s/write_ops/batch_write_op.h"
+#include <numeric>
+
#include "mongo/base/error_codes.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
@@ -44,32 +46,11 @@ using std::vector;
namespace {
-/**
- * Compares endpoints in a map.
- */
-struct EndpointComp {
- bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const {
- const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName);
- if (shardNameDiff) {
- return shardNameDiff < 0;
- }
-
- const long shardVersionDiff =
- endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
- if (shardVersionDiff) {
- return shardVersionDiff < 0;
- }
-
- return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0;
- }
-};
-
struct BatchSize {
int numOps{0};
int sizeBytes{0};
};
-typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap;
typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap;
struct WriteErrorDetailComp {
@@ -190,36 +171,6 @@ int getWriteSizeBytes(const WriteOp& writeOp) {
}
}
-/**
- * Helper function to cancel all the write ops of targeted batches in a map
- */
-void cancelBatches(const WriteErrorDetail& why, WriteOp* writeOps, TargetedBatchMap* batchMap) {
- set<WriteOp*> targetedWriteOps;
-
- // Collect all the writeOps that are currently targeted
- for (TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end();) {
- TargetedWriteBatch* batch = it->second;
- const vector<TargetedWrite*>& writes = batch->getWrites();
-
- for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
- writeIt != writes.end();
- ++writeIt) {
- TargetedWrite* write = *writeIt;
-
- // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to
- // cancel before erasing the TargetedWrite* (which owns the cancelled targeting
- // info) for reporting reasons.
- writeOps[write->writeOpRef.first].cancelWrites(&why);
- }
-
- // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from
- // the values
- batchMap->erase(it++);
- delete batch;
- }
- batchMap->clear();
-}
-
void cloneCommandErrorTo(const BatchedCommandResponse& batchResp, WriteErrorDetail* details) {
details->setErrCode(batchResp.getErrCode());
details->setErrMessage(batchResp.getErrMessage());
@@ -263,32 +214,17 @@ void trackErrors(const ShardEndpoint& endpoint,
} // namespace
BatchWriteOp::BatchWriteOp(const BatchedCommandRequest& clientRequest)
- : _clientRequest(clientRequest), _writeOps(NULL) {
- dassert(_clientRequest.isValid(NULL));
+ : _clientRequest(clientRequest) {
+ _writeOps.reserve(_clientRequest.sizeWriteOps());
- size_t numWriteOps = _clientRequest.sizeWriteOps();
- _writeOps = static_cast<WriteOp*>(::operator new[](numWriteOps * sizeof(WriteOp)));
-
- for (size_t i = 0; i < numWriteOps; ++i) {
- // Don't want to have to define what an empty WriteOp means, so construct in-place
- new (&_writeOps[i]) WriteOp(BatchItemRef(&_clientRequest, i));
+ for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) {
+ _writeOps.emplace_back(BatchItemRef(&_clientRequest, i));
}
}
BatchWriteOp::~BatchWriteOp() {
// Caller's responsibility to dispose of TargetedBatches
- dassert(_targeted.empty());
-
- if (NULL != _writeOps) {
- size_t numWriteOps = _clientRequest.sizeWriteOps();
- for (size_t i = 0; i < numWriteOps; ++i) {
- // Placement new so manual destruct
- _writeOps[i].~WriteOp();
- }
-
- ::operator delete[](_writeOps);
- _writeOps = NULL;
- }
+ invariant(_targeted.empty());
}
Status BatchWriteOp::targetBatch(OperationContext* opCtx,
@@ -335,7 +271,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
int numTargetErrors = 0;
- size_t numWriteOps = _clientRequest.sizeWriteOps();
+ const size_t numWriteOps = _clientRequest.sizeWriteOps();
+
for (size_t i = 0; i < numWriteOps; ++i) {
WriteOp& writeOp = _writeOps[i];
@@ -359,9 +296,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
if (!recordTargetErrors) {
// Cancel current batch state with an error
-
- cancelBatches(targetError, _writeOps, &batchMap);
- dassert(batchMap.empty());
+ _cancelBatches(targetError, std::move(batchMap));
return targetStatus;
} else if (!ordered || batchMap.empty()) {
// Record an error for this batch
@@ -376,9 +311,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
} else {
dassert(ordered && !batchMap.empty());
- // Send out what we have, but don't record an error yet, since there may be an
- // error in the writes before this point.
-
+ // Send out what we have, but don't record an error yet, since there may be an error
+ // in the writes before this point.
writeOp.cancelWrites(&targetError);
break;
}
@@ -457,6 +391,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
// Remember targeted batch for reporting
_targeted.insert(batch);
+
// Send the handle back to caller
invariant(targetedBatches->find(batch->getEndpoint().shardName) == targetedBatches->end());
targetedBatches->insert(std::make_pair(batch->getEndpoint().shardName, batch));
@@ -540,7 +475,7 @@ void BatchWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch,
_targeted.erase(&targetedBatch);
// Increment stats for this batch
- _incBatchStats(_clientRequest.getBatchType(), response);
+ _incBatchStats(response);
//
// Assign errors to particular items.
@@ -770,25 +705,17 @@ void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) {
dassert(batchResp->isValid(NULL));
}
-int BatchWriteOp::numWriteOps() const {
- return static_cast<int>(_clientRequest.sizeWriteOps());
-}
-
int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const {
// TODO: This could be faster, if we tracked this info explicitly
- size_t numWriteOps = _clientRequest.sizeWriteOps();
- int count = 0;
- for (size_t i = 0; i < numWriteOps; ++i) {
- WriteOp& writeOp = _writeOps[i];
- if (writeOp.getWriteState() == opState)
- ++count;
- }
-
- return count;
+ return std::accumulate(
+ _writeOps.begin(), _writeOps.end(), 0, [opState](int sum, const WriteOp& writeOp) {
+ return sum + (writeOp.getWriteState() == opState ? 1 : 0);
+ });
}
-void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType,
- const BatchedCommandResponse& response) {
+void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) {
+ const auto batchType = _clientRequest.getBatchType();
+
if (batchType == BatchedCommandRequest::BatchType_Insert) {
_numInserted += response.getN();
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
@@ -811,6 +738,49 @@ void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType,
}
}
+void BatchWriteOp::_cancelBatches(const WriteErrorDetail& why,
+ TargetedBatchMap&& batchMapToCancel) {
+ TargetedBatchMap batchMap(batchMapToCancel);
+
+ // Collect all the writeOps that are currently targeted
+ for (TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end();) {
+ TargetedWriteBatch* batch = it->second;
+ const vector<TargetedWrite*>& writes = batch->getWrites();
+
+ for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
+ writeIt != writes.end();
+ ++writeIt) {
+ TargetedWrite* write = *writeIt;
+
+ // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to cancel
+ // before erasing the TargetedWrite* (which owns the cancelled targeting info) for
+ // reporting reasons.
+ _writeOps[write->writeOpRef.first].cancelWrites(&why);
+ }
+
+ // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from
+ // the values
+ batchMap.erase(it++);
+ delete batch;
+ }
+}
+
+bool EndpointComp::operator()(const ShardEndpoint* endpointA,
+ const ShardEndpoint* endpointB) const {
+ const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName);
+ if (shardNameDiff) {
+ return shardNameDiff < 0;
+ }
+
+ const long shardVersionDiff =
+ endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
+ if (shardVersionDiff) {
+ return shardVersionDiff < 0;
+ }
+
+ return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0;
+}
+
void TrackedErrors::startTracking(int errCode) {
dassert(!isTracking(errCode));
_errorMap.insert(make_pair(errCode, vector<ShardError*>()));