summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-04-27 16:01:24 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-04-29 08:55:38 -0400
commit2e8e60bfef83ac9c7bf494b0f80977686cb1b772 (patch)
tree32bc8ddb9dcdbc8738db1aa2348c17253f6a11f1
parentce76085e2464c38119b307851d4c63687d9a581f (diff)
downloadmongo-2e8e60bfef83ac9c7bf494b0f80977686cb1b772.tar.gz
SERVER-28992 Make BatchWriteOp contain a vector of WriteOp
... instead of a dynamically allocated array and in-place construction/destruction.
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp158
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h29
-rw-r--r--src/mongo/s/write_ops/write_op.cpp80
-rw-r--r--src/mongo/s/write_ops/write_op.h23
4 files changed, 125 insertions, 165 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 879a3fab200..13d088c57aa 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -30,6 +30,8 @@
#include "mongo/s/write_ops/batch_write_op.h"
+#include <numeric>
+
#include "mongo/base/error_codes.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
@@ -44,32 +46,11 @@ using std::vector;
namespace {
-/**
- * Compares endpoints in a map.
- */
-struct EndpointComp {
- bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const {
- const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName);
- if (shardNameDiff) {
- return shardNameDiff < 0;
- }
-
- const long shardVersionDiff =
- endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
- if (shardVersionDiff) {
- return shardVersionDiff < 0;
- }
-
- return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0;
- }
-};
-
struct BatchSize {
int numOps{0};
int sizeBytes{0};
};
-typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap;
typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap;
struct WriteErrorDetailComp {
@@ -190,36 +171,6 @@ int getWriteSizeBytes(const WriteOp& writeOp) {
}
}
-/**
- * Helper function to cancel all the write ops of targeted batches in a map
- */
-void cancelBatches(const WriteErrorDetail& why, WriteOp* writeOps, TargetedBatchMap* batchMap) {
- set<WriteOp*> targetedWriteOps;
-
- // Collect all the writeOps that are currently targeted
- for (TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end();) {
- TargetedWriteBatch* batch = it->second;
- const vector<TargetedWrite*>& writes = batch->getWrites();
-
- for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
- writeIt != writes.end();
- ++writeIt) {
- TargetedWrite* write = *writeIt;
-
- // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to
- // cancel before erasing the TargetedWrite* (which owns the cancelled targeting
- // info) for reporting reasons.
- writeOps[write->writeOpRef.first].cancelWrites(&why);
- }
-
- // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from
- // the values
- batchMap->erase(it++);
- delete batch;
- }
- batchMap->clear();
-}
-
void cloneCommandErrorTo(const BatchedCommandResponse& batchResp, WriteErrorDetail* details) {
details->setErrCode(batchResp.getErrCode());
details->setErrMessage(batchResp.getErrMessage());
@@ -263,32 +214,17 @@ void trackErrors(const ShardEndpoint& endpoint,
} // namespace
BatchWriteOp::BatchWriteOp(const BatchedCommandRequest& clientRequest)
- : _clientRequest(clientRequest), _writeOps(NULL) {
- dassert(_clientRequest.isValid(NULL));
+ : _clientRequest(clientRequest) {
+ _writeOps.reserve(_clientRequest.sizeWriteOps());
- size_t numWriteOps = _clientRequest.sizeWriteOps();
- _writeOps = static_cast<WriteOp*>(::operator new[](numWriteOps * sizeof(WriteOp)));
-
- for (size_t i = 0; i < numWriteOps; ++i) {
- // Don't want to have to define what an empty WriteOp means, so construct in-place
- new (&_writeOps[i]) WriteOp(BatchItemRef(&_clientRequest, i));
+ for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) {
+ _writeOps.emplace_back(BatchItemRef(&_clientRequest, i));
}
}
BatchWriteOp::~BatchWriteOp() {
// Caller's responsibility to dispose of TargetedBatches
- dassert(_targeted.empty());
-
- if (NULL != _writeOps) {
- size_t numWriteOps = _clientRequest.sizeWriteOps();
- for (size_t i = 0; i < numWriteOps; ++i) {
- // Placement new so manual destruct
- _writeOps[i].~WriteOp();
- }
-
- ::operator delete[](_writeOps);
- _writeOps = NULL;
- }
+ invariant(_targeted.empty());
}
Status BatchWriteOp::targetBatch(OperationContext* opCtx,
@@ -335,7 +271,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
int numTargetErrors = 0;
- size_t numWriteOps = _clientRequest.sizeWriteOps();
+ const size_t numWriteOps = _clientRequest.sizeWriteOps();
+
for (size_t i = 0; i < numWriteOps; ++i) {
WriteOp& writeOp = _writeOps[i];
@@ -359,9 +296,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
if (!recordTargetErrors) {
// Cancel current batch state with an error
-
- cancelBatches(targetError, _writeOps, &batchMap);
- dassert(batchMap.empty());
+ _cancelBatches(targetError, std::move(batchMap));
return targetStatus;
} else if (!ordered || batchMap.empty()) {
// Record an error for this batch
@@ -376,9 +311,8 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
} else {
dassert(ordered && !batchMap.empty());
- // Send out what we have, but don't record an error yet, since there may be an
- // error in the writes before this point.
-
+ // Send out what we have, but don't record an error yet, since there may be an error
+ // in the writes before this point.
writeOp.cancelWrites(&targetError);
break;
}
@@ -457,6 +391,7 @@ Status BatchWriteOp::targetBatch(OperationContext* opCtx,
// Remember targeted batch for reporting
_targeted.insert(batch);
+
// Send the handle back to caller
invariant(targetedBatches->find(batch->getEndpoint().shardName) == targetedBatches->end());
targetedBatches->insert(std::make_pair(batch->getEndpoint().shardName, batch));
@@ -540,7 +475,7 @@ void BatchWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch,
_targeted.erase(&targetedBatch);
// Increment stats for this batch
- _incBatchStats(_clientRequest.getBatchType(), response);
+ _incBatchStats(response);
//
// Assign errors to particular items.
@@ -770,25 +705,17 @@ void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) {
dassert(batchResp->isValid(NULL));
}
-int BatchWriteOp::numWriteOps() const {
- return static_cast<int>(_clientRequest.sizeWriteOps());
-}
-
int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const {
// TODO: This could be faster, if we tracked this info explicitly
- size_t numWriteOps = _clientRequest.sizeWriteOps();
- int count = 0;
- for (size_t i = 0; i < numWriteOps; ++i) {
- WriteOp& writeOp = _writeOps[i];
- if (writeOp.getWriteState() == opState)
- ++count;
- }
-
- return count;
+ return std::accumulate(
+ _writeOps.begin(), _writeOps.end(), 0, [opState](int sum, const WriteOp& writeOp) {
+ return sum + (writeOp.getWriteState() == opState ? 1 : 0);
+ });
}
-void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType,
- const BatchedCommandResponse& response) {
+void BatchWriteOp::_incBatchStats(const BatchedCommandResponse& response) {
+ const auto batchType = _clientRequest.getBatchType();
+
if (batchType == BatchedCommandRequest::BatchType_Insert) {
_numInserted += response.getN();
} else if (batchType == BatchedCommandRequest::BatchType_Update) {
@@ -811,6 +738,49 @@ void BatchWriteOp::_incBatchStats(BatchedCommandRequest::BatchType batchType,
}
}
+void BatchWriteOp::_cancelBatches(const WriteErrorDetail& why,
+ TargetedBatchMap&& batchMapToCancel) {
+ TargetedBatchMap batchMap(batchMapToCancel);
+
+ // Collect all the writeOps that are currently targeted
+ for (TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end();) {
+ TargetedWriteBatch* batch = it->second;
+ const vector<TargetedWrite*>& writes = batch->getWrites();
+
+ for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
+ writeIt != writes.end();
+ ++writeIt) {
+ TargetedWrite* write = *writeIt;
+
+ // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to cancel
+ // before erasing the TargetedWrite* (which owns the cancelled targeting info) for
+ // reporting reasons.
+ _writeOps[write->writeOpRef.first].cancelWrites(&why);
+ }
+
+ // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from
+ // the values
+ batchMap.erase(it++);
+ delete batch;
+ }
+}
+
+bool EndpointComp::operator()(const ShardEndpoint* endpointA,
+ const ShardEndpoint* endpointB) const {
+ const int shardNameDiff = endpointA->shardName.compare(endpointB->shardName);
+ if (shardNameDiff) {
+ return shardNameDiff < 0;
+ }
+
+ const long shardVersionDiff =
+ endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
+ if (shardVersionDiff) {
+ return shardVersionDiff < 0;
+ }
+
+ return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0;
+}
+
void TrackedErrors::startTracking(int errCode) {
dassert(!isTracking(errCode));
_errorMap.insert(make_pair(errCode, vector<ShardError*>()));
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 6230fdee1f0..1ac8d8345de 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -28,7 +28,6 @@
#pragma once
-#include <memory>
#include <set>
#include <vector>
@@ -52,6 +51,15 @@ struct ShardWCError;
class TrackedErrors;
/**
+ * Compares endpoints in a map.
+ */
+struct EndpointComp {
+ bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const;
+};
+
+using TargetedBatchMap = std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp>;
+
+/**
* The BatchWriteOp class manages the lifecycle of a batched write received by mongos. Each
* item in a batch is tracked via a WriteOp, and the function of the BatchWriteOp is to
* aggregate the dispatched requests and responses for the underlying WriteOps.
@@ -147,19 +155,28 @@ public:
*/
void buildClientResponse(BatchedCommandResponse* batchResp);
- int numWriteOps() const;
-
+ /**
+ * Returns the number of write operations which are in the specified state. Runs in O(number of
+ * write operations).
+ */
int numWriteOpsIn(WriteOpState state) const;
private:
- void _incBatchStats(BatchedCommandRequest::BatchType batchType,
- const BatchedCommandResponse& response);
+ /**
+ * Maintains the batch execution statistics when a response is received.
+ */
+ void _incBatchStats(const BatchedCommandResponse& response);
+
+ /**
+ * Helper function to cancel all the write ops of targeted batches in a map.
+ */
+ void _cancelBatches(const WriteErrorDetail& why, TargetedBatchMap&& batchMapToCancel);
// The incoming client request
const BatchedCommandRequest& _clientRequest;
// Array of ops being processed from the client request
- WriteOp* _writeOps;
+ std::vector<WriteOp> _writeOps;
// Current outstanding batch op write requests
// Not owned here but tracked for reporting
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index 919118bed4e..aafe20ecd29 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -30,8 +30,6 @@
#include "mongo/s/write_ops/write_op.h"
-#include "mongo/base/error_codes.h"
-#include "mongo/base/owned_pointer_vector.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -39,19 +37,6 @@ namespace mongo {
using std::stringstream;
using std::vector;
-static void clear(vector<ChildWriteOp*>* childOps) {
- for (vector<ChildWriteOp*>::const_iterator it = childOps->begin(); it != childOps->end();
- ++it) {
- delete *it;
- }
- childOps->clear();
-}
-
-WriteOp::~WriteOp() {
- clear(&_childOps);
- clear(&_history);
-}
-
const BatchItemRef& WriteOp::getWriteItem() const {
return _itemRef;
}
@@ -118,7 +103,7 @@ Status WriteOp::targetWrites(OperationContext* opCtx,
for (auto it = endpoints.begin(); it != endpoints.end(); ++it) {
ShardEndpoint* endpoint = it->get();
- _childOps.push_back(new ChildWriteOp(this));
+ _childOps.emplace_back(this);
WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
@@ -130,8 +115,8 @@ Status WriteOp::targetWrites(OperationContext* opCtx,
targetedWrites->push_back(new TargetedWrite(broadcastEndpoint, ref));
}
- _childOps.back()->pendingWrite = targetedWrites->back();
- _childOps.back()->state = WriteOpState_Pending;
+ _childOps.back().pendingWrite = targetedWrites->back();
+ _childOps.back().state = WriteOpState_Pending;
}
_state = WriteOpState_Pending;
@@ -147,7 +132,7 @@ static bool isRetryErrCode(int errCode) {
}
// Aggregate a bunch of errors for a single op together
-static void combineOpErrors(const vector<ChildWriteOp*>& errOps, WriteErrorDetail* error) {
+static void combineOpErrors(const vector<ChildWriteOp const*>& errOps, WriteErrorDetail* error) {
// Special case single response
if (errOps.size() == 1) {
errOps.front()->error->cloneTo(error);
@@ -161,7 +146,8 @@ static void combineOpErrors(const vector<ChildWriteOp*>& errOps, WriteErrorDetai
msg << "multiple errors for op : ";
BSONArrayBuilder errB;
- for (vector<ChildWriteOp*>::const_iterator it = errOps.begin(); it != errOps.end(); ++it) {
+ for (vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end();
+ ++it) {
const ChildWriteOp* errOp = *it;
if (it != errOps.begin())
msg << " :: and :: ";
@@ -178,29 +164,29 @@ static void combineOpErrors(const vector<ChildWriteOp*>& errOps, WriteErrorDetai
* This is the core function which aggregates all the results of a write operation on multiple
* shards and updates the write operation's state.
*/
-void WriteOp::updateOpState() {
- vector<ChildWriteOp*> childErrors;
+void WriteOp::_updateOpState() {
+ std::vector<ChildWriteOp const*> childErrors;
bool isRetryError = true;
- for (vector<ChildWriteOp*>::iterator it = _childOps.begin(); it != _childOps.end(); it++) {
- ChildWriteOp* childOp = *it;
-
+ for (const auto& childOp : _childOps) {
// Don't do anything till we have all the info
- if (childOp->state != WriteOpState_Completed && childOp->state != WriteOpState_Error) {
+ if (childOp.state != WriteOpState_Completed && childOp.state != WriteOpState_Error) {
return;
}
- if (childOp->state == WriteOpState_Error) {
- childErrors.push_back(childOp);
+ if (childOp.state == WriteOpState_Error) {
+ childErrors.push_back(&childOp);
+
// Any non-retry error aborts all
- if (!isRetryErrCode(childOp->error->getErrCode()))
+ if (!isRetryErrCode(childOp.error->getErrCode())) {
isRetryError = false;
+ }
}
}
if (!childErrors.empty() && isRetryError) {
// Since we're using broadcast mode for multi-shard writes, which cannot SCE
- dassert(childErrors.size() == 1u);
+ invariant(childErrors.size() == 1u);
_state = WriteOpState_Ready;
} else if (!childErrors.empty()) {
_error.reset(new WriteErrorDetail);
@@ -210,48 +196,42 @@ void WriteOp::updateOpState() {
_state = WriteOpState_Completed;
}
- // Now that we're done with the child ops, do something with them
- // TODO: Don't store unlimited history?
- dassert(_state != WriteOpState_Pending);
- _history.insert(_history.end(), _childOps.begin(), _childOps.end());
+ invariant(_state != WriteOpState_Pending);
_childOps.clear();
}
void WriteOp::cancelWrites(const WriteErrorDetail* why) {
- dassert(_state == WriteOpState_Pending || _state == WriteOpState_Ready);
- for (vector<ChildWriteOp*>::iterator it = _childOps.begin(); it != _childOps.end(); ++it) {
- ChildWriteOp* childOp = *it;
+ invariant(_state == WriteOpState_Pending || _state == WriteOpState_Ready);
- if (childOp->state == WriteOpState_Pending) {
- childOp->endpoint.reset(new ShardEndpoint(childOp->pendingWrite->endpoint));
+ for (auto& childOp : _childOps) {
+ if (childOp.state == WriteOpState_Pending) {
+ childOp.endpoint.reset(new ShardEndpoint(childOp.pendingWrite->endpoint));
if (why) {
- childOp->error.reset(new WriteErrorDetail);
- why->cloneTo(childOp->error.get());
+ childOp.error.reset(new WriteErrorDetail);
+ why->cloneTo(childOp.error.get());
}
- childOp->state = WriteOpState_Cancelled;
+
+ childOp.state = WriteOpState_Cancelled;
}
}
- _history.insert(_history.end(), _childOps.begin(), _childOps.end());
- _childOps.clear();
-
_state = WriteOpState_Ready;
+ _childOps.clear();
}
void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) {
const WriteOpRef& ref = targetedWrite.writeOpRef;
- dassert(static_cast<size_t>(ref.second) < _childOps.size());
- ChildWriteOp& childOp = *_childOps.at(ref.second);
+ auto& childOp = _childOps[ref.second];
childOp.pendingWrite = NULL;
childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
childOp.state = WriteOpState_Completed;
- updateOpState();
+ _updateOpState();
}
void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErrorDetail& error) {
const WriteOpRef& ref = targetedWrite.writeOpRef;
- ChildWriteOp& childOp = *_childOps.at(ref.second);
+ auto& childOp = _childOps[ref.second];
childOp.pendingWrite = NULL;
childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
@@ -260,7 +240,7 @@ void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErro
dassert(ref.first == _itemRef.getItemIndex());
childOp.error->setIndex(_itemRef.getItemIndex());
childOp.state = WriteOpState_Error;
- updateOpState();
+ _updateOpState();
}
void WriteOp::setOpError(const WriteErrorDetail& error) {
diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h
index 00dd9abe7a9..759652f98e3 100644
--- a/src/mongo/s/write_ops/write_op.h
+++ b/src/mongo/s/write_ops/write_op.h
@@ -30,8 +30,6 @@
#include <vector>
-#include "mongo/base/string_data.h"
-#include "mongo/bson/bsonobj.h"
#include "mongo/s/ns_targeter.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/write_error_detail.h"
@@ -90,9 +88,7 @@ enum WriteOpState {
*/
class WriteOp {
public:
- WriteOp(const BatchItemRef& itemRef) : _itemRef(itemRef), _state(WriteOpState_Ready) {}
-
- ~WriteOp();
+ WriteOp(BatchItemRef itemRef) : _itemRef(std::move(itemRef)) {}
/**
* Returns the write item for this operation
@@ -166,22 +162,19 @@ private:
/**
* Updates the op state after new information is received.
*/
- void updateOpState();
+ void _updateOpState();
// Owned elsewhere, reference to a batch with a write item
const BatchItemRef _itemRef;
// What stage of the operation we are at
- WriteOpState _state;
+ WriteOpState _state{WriteOpState_Ready};
// filled when state == _Pending
- std::vector<ChildWriteOp*> _childOps;
+ std::vector<ChildWriteOp> _childOps;
// filled when state == _Error
std::unique_ptr<WriteErrorDetail> _error;
-
- // Finished child operations, for debugging
- std::vector<ChildWriteOp*> _history;
};
/**
@@ -192,15 +185,15 @@ private:
* (_Error) state.
*/
struct ChildWriteOp {
- ChildWriteOp(WriteOp* const parent)
- : parentOp(parent), state(WriteOpState_Ready), pendingWrite(NULL) {}
+ ChildWriteOp(WriteOp* const parent) : parentOp(parent) {}
const WriteOp* const parentOp;
- WriteOpState state;
+
+ WriteOpState state{WriteOpState_Ready};
// non-zero when state == _Pending
// Not owned here but tracked for reporting
- TargetedWrite* pendingWrite;
+ TargetedWrite* pendingWrite{nullptr};
// filled when state > _Pending
std::unique_ptr<ShardEndpoint> endpoint;