diff options
Diffstat (limited to 'src/mongo/s/write_ops/write_op.cpp')
-rw-r--r-- | src/mongo/s/write_ops/write_op.cpp | 119 |
1 files changed, 64 insertions, 55 deletions
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index f403714a020..c0288b7080a 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -27,57 +27,17 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/s/write_ops/write_op.h" #include "mongo/s/transaction_router.h" #include "mongo/util/assert_util.h" namespace mongo { -namespace { - -bool isRetryErrCode(int errCode) { - return errCode == ErrorCodes::StaleShardVersion || errCode == ErrorCodes::StaleConfig || - errCode == ErrorCodes::StaleDbVersion || - errCode == ErrorCodes::ShardCannotRefreshDueToLocksHeld || - errCode == ErrorCodes::TenantMigrationAborted; -} - -bool errorsAllSame(const std::vector<ChildWriteOp const*>& errOps) { - auto errCode = errOps.front()->error->getStatus().code(); - if (std::all_of(++errOps.begin(), errOps.end(), [errCode](const ChildWriteOp* errOp) { - return errOp->error->getStatus().code() == errCode; - })) { - return true; - } - - return false; -} - -// Aggregate a bunch of errors for a single op together -write_ops::WriteError combineOpErrors(const std::vector<ChildWriteOp const*>& errOps) { - // Special case single response or all errors are the same - if (errOps.size() == 1 || errorsAllSame(errOps)) { - return *errOps.front()->error; - } - // Generate the multi-error message below - std::stringstream msg("multiple errors for op : "); - - BSONArrayBuilder errB; - for (std::vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end(); - ++it) { - const ChildWriteOp* errOp = *it; - if (it != errOps.begin()) - msg << " :: and :: "; - msg << errOp->error->getStatus().reason(); - errB.append(errOp->error->serialize()); - } - - return write_ops::WriteError(errOps.front()->error->getIndex(), - Status(MultipleErrorsOccurredInfo(errB.arr()), msg.str())); -} - -} // namespace +using std::stringstream; +using std::vector; const BatchItemRef& WriteOp::getWriteItem() const { return _itemRef; @@ -87,7 +47,7 @@ WriteOpState WriteOp::getWriteState() const { return _state; } -const write_ops::WriteError& WriteOp::getOpError() const { +const WriteErrorDetail& WriteOp::getOpError() const { dassert(_state == WriteOpState_Error); return *_error; } @@ -146,6 +106,50 @@ size_t WriteOp::getNumTargeted() { return _childOps.size(); } +static bool isRetryErrCode(int errCode) { + return errCode == ErrorCodes::StaleShardVersion || errCode == ErrorCodes::StaleDbVersion || + errCode == ErrorCodes::ShardCannotRefreshDueToLocksHeld || + errCode == ErrorCodes::TenantMigrationAborted; +} + +static bool errorsAllSame(const vector<ChildWriteOp const*>& errOps) { + auto errCode = errOps.front()->error->toStatus().code(); + if (std::all_of(++errOps.begin(), errOps.end(), [errCode](const ChildWriteOp* errOp) { + return errOp->error->toStatus().code() == errCode; + })) { + return true; + } + + return false; +} + +// Aggregate a bunch of errors for a single op together +static void combineOpErrors(const vector<ChildWriteOp const*>& errOps, WriteErrorDetail* error) { + // Special case single response or all errors are the same + if (errOps.size() == 1 || errorsAllSame(errOps)) { + errOps.front()->error->cloneTo(error); + return; + } + + // Generate the multi-error message below + stringstream msg; + msg << "multiple errors for op : "; + + BSONArrayBuilder errB; + for (vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end(); + ++it) { + const ChildWriteOp* errOp = *it; + if (it != errOps.begin()) + msg << " :: and :: "; + msg << errOp->error->toStatus().reason(); + errB.append(errOp->error->toBSON()); + } + + error->setErrInfo(BSON("causedBy" << errB.arr())); + error->setIndex(errOps.front()->error->getIndex()); + error->setStatus({ErrorCodes::MultipleErrorsOccurred, msg.str()}); +} + /** * This is the core function which aggregates all the results of a write operation on multiple * shards and updates the write operation's state. @@ -170,7 +174,7 @@ void WriteOp::_updateOpState() { childErrors.push_back(&childOp); // Any non-retry error aborts all - if (_inTxn || !isRetryErrCode(childOp.error->getStatus().code())) { + if (_inTxn || !isRetryErrCode(childOp.error->toStatus().code())) { isRetryError = false; } } @@ -179,7 +183,8 @@ void WriteOp::_updateOpState() { if (!childErrors.empty() && isRetryError) { _state = WriteOpState_Ready; } else if (!childErrors.empty()) { - _error = combineOpErrors(childErrors); + _error.reset(new WriteErrorDetail); + combineOpErrors(childErrors, _error.get()); _state = WriteOpState_Error; } else if (hasPendingChild && _inTxn) { // Return early here since this means that there were no errors while in txn @@ -193,14 +198,17 @@ void WriteOp::_updateOpState() { _childOps.clear(); } -void WriteOp::cancelWrites(const write_ops::WriteError* why) { +void WriteOp::cancelWrites(const WriteErrorDetail* why) { invariant(_state == WriteOpState_Pending || _state == WriteOpState_Ready); for (auto& childOp : _childOps) { if (childOp.state == WriteOpState_Pending) { childOp.endpoint.reset(new ShardEndpoint(childOp.pendingWrite->endpoint)); - if (why) - childOp.error = *why; + if (why) { + childOp.error.reset(new WriteErrorDetail); + why->cloneTo(childOp.error.get()); + } + childOp.state = WriteOpState_Cancelled; } } @@ -220,23 +228,24 @@ void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) { _updateOpState(); } -void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, - const write_ops::WriteError& error) { +void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErrorDetail& error) { const WriteOpRef& ref = targetedWrite.writeOpRef; auto& childOp = _childOps[ref.second]; childOp.pendingWrite = nullptr; childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint)); - childOp.error = error; + childOp.error.reset(new WriteErrorDetail); + error.cloneTo(childOp.error.get()); dassert(ref.first == _itemRef.getItemIndex()); childOp.error->setIndex(_itemRef.getItemIndex()); childOp.state = WriteOpState_Error; _updateOpState(); } -void WriteOp::setOpError(const write_ops::WriteError& error) { +void WriteOp::setOpError(const WriteErrorDetail& error) { dassert(_state == WriteOpState_Ready); - _error = error; + _error.reset(new WriteErrorDetail); + error.cloneTo(_error.get()); _error->setIndex(_itemRef.getItemIndex()); _state = WriteOpState_Error; // No need to updateOpState, set directly |