summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/write_op.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/write_ops/write_op.cpp')
-rw-r--r--src/mongo/s/write_ops/write_op.cpp119
1 files changed, 64 insertions, 55 deletions
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index f403714a020..c0288b7080a 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -27,57 +27,17 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/s/write_ops/write_op.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/assert_util.h"
namespace mongo {
-namespace {
-
-bool isRetryErrCode(int errCode) {
- return errCode == ErrorCodes::StaleShardVersion || errCode == ErrorCodes::StaleConfig ||
- errCode == ErrorCodes::StaleDbVersion ||
- errCode == ErrorCodes::ShardCannotRefreshDueToLocksHeld ||
- errCode == ErrorCodes::TenantMigrationAborted;
-}
-
-bool errorsAllSame(const std::vector<ChildWriteOp const*>& errOps) {
- auto errCode = errOps.front()->error->getStatus().code();
- if (std::all_of(++errOps.begin(), errOps.end(), [errCode](const ChildWriteOp* errOp) {
- return errOp->error->getStatus().code() == errCode;
- })) {
- return true;
- }
-
- return false;
-}
-
-// Aggregate a bunch of errors for a single op together
-write_ops::WriteError combineOpErrors(const std::vector<ChildWriteOp const*>& errOps) {
- // Special case single response or all errors are the same
- if (errOps.size() == 1 || errorsAllSame(errOps)) {
- return *errOps.front()->error;
- }
- // Generate the multi-error message below
- std::stringstream msg("multiple errors for op : ");
-
- BSONArrayBuilder errB;
- for (std::vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end();
- ++it) {
- const ChildWriteOp* errOp = *it;
- if (it != errOps.begin())
- msg << " :: and :: ";
- msg << errOp->error->getStatus().reason();
- errB.append(errOp->error->serialize());
- }
-
- return write_ops::WriteError(errOps.front()->error->getIndex(),
- Status(MultipleErrorsOccurredInfo(errB.arr()), msg.str()));
-}
-
-} // namespace
+using std::stringstream;
+using std::vector;
const BatchItemRef& WriteOp::getWriteItem() const {
return _itemRef;
@@ -87,7 +47,7 @@ WriteOpState WriteOp::getWriteState() const {
return _state;
}
-const write_ops::WriteError& WriteOp::getOpError() const {
+const WriteErrorDetail& WriteOp::getOpError() const {
dassert(_state == WriteOpState_Error);
return *_error;
}
@@ -146,6 +106,50 @@ size_t WriteOp::getNumTargeted() {
return _childOps.size();
}
+static bool isRetryErrCode(int errCode) {
+ return errCode == ErrorCodes::StaleShardVersion || errCode == ErrorCodes::StaleDbVersion ||
+ errCode == ErrorCodes::ShardCannotRefreshDueToLocksHeld ||
+ errCode == ErrorCodes::TenantMigrationAborted;
+}
+
+static bool errorsAllSame(const vector<ChildWriteOp const*>& errOps) {
+ auto errCode = errOps.front()->error->toStatus().code();
+ if (std::all_of(++errOps.begin(), errOps.end(), [errCode](const ChildWriteOp* errOp) {
+ return errOp->error->toStatus().code() == errCode;
+ })) {
+ return true;
+ }
+
+ return false;
+}
+
+// Aggregate a bunch of errors for a single op together
+static void combineOpErrors(const vector<ChildWriteOp const*>& errOps, WriteErrorDetail* error) {
+ // Special case single response or all errors are the same
+ if (errOps.size() == 1 || errorsAllSame(errOps)) {
+ errOps.front()->error->cloneTo(error);
+ return;
+ }
+
+ // Generate the multi-error message below
+ stringstream msg;
+ msg << "multiple errors for op : ";
+
+ BSONArrayBuilder errB;
+ for (vector<ChildWriteOp const*>::const_iterator it = errOps.begin(); it != errOps.end();
+ ++it) {
+ const ChildWriteOp* errOp = *it;
+ if (it != errOps.begin())
+ msg << " :: and :: ";
+ msg << errOp->error->toStatus().reason();
+ errB.append(errOp->error->toBSON());
+ }
+
+ error->setErrInfo(BSON("causedBy" << errB.arr()));
+ error->setIndex(errOps.front()->error->getIndex());
+ error->setStatus({ErrorCodes::MultipleErrorsOccurred, msg.str()});
+}
+
/**
* This is the core function which aggregates all the results of a write operation on multiple
* shards and updates the write operation's state.
@@ -170,7 +174,7 @@ void WriteOp::_updateOpState() {
childErrors.push_back(&childOp);
// Any non-retry error aborts all
- if (_inTxn || !isRetryErrCode(childOp.error->getStatus().code())) {
+ if (_inTxn || !isRetryErrCode(childOp.error->toStatus().code())) {
isRetryError = false;
}
}
@@ -179,7 +183,8 @@ void WriteOp::_updateOpState() {
if (!childErrors.empty() && isRetryError) {
_state = WriteOpState_Ready;
} else if (!childErrors.empty()) {
- _error = combineOpErrors(childErrors);
+ _error.reset(new WriteErrorDetail);
+ combineOpErrors(childErrors, _error.get());
_state = WriteOpState_Error;
} else if (hasPendingChild && _inTxn) {
// Return early here since this means that there were no errors while in txn
@@ -193,14 +198,17 @@ void WriteOp::_updateOpState() {
_childOps.clear();
}
-void WriteOp::cancelWrites(const write_ops::WriteError* why) {
+void WriteOp::cancelWrites(const WriteErrorDetail* why) {
invariant(_state == WriteOpState_Pending || _state == WriteOpState_Ready);
for (auto& childOp : _childOps) {
if (childOp.state == WriteOpState_Pending) {
childOp.endpoint.reset(new ShardEndpoint(childOp.pendingWrite->endpoint));
- if (why)
- childOp.error = *why;
+ if (why) {
+ childOp.error.reset(new WriteErrorDetail);
+ why->cloneTo(childOp.error.get());
+ }
+
childOp.state = WriteOpState_Cancelled;
}
}
@@ -220,23 +228,24 @@ void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) {
_updateOpState();
}
-void WriteOp::noteWriteError(const TargetedWrite& targetedWrite,
- const write_ops::WriteError& error) {
+void WriteOp::noteWriteError(const TargetedWrite& targetedWrite, const WriteErrorDetail& error) {
const WriteOpRef& ref = targetedWrite.writeOpRef;
auto& childOp = _childOps[ref.second];
childOp.pendingWrite = nullptr;
childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
- childOp.error = error;
+ childOp.error.reset(new WriteErrorDetail);
+ error.cloneTo(childOp.error.get());
dassert(ref.first == _itemRef.getItemIndex());
childOp.error->setIndex(_itemRef.getItemIndex());
childOp.state = WriteOpState_Error;
_updateOpState();
}
-void WriteOp::setOpError(const write_ops::WriteError& error) {
+void WriteOp::setOpError(const WriteErrorDetail& error) {
dassert(_state == WriteOpState_Ready);
- _error = error;
+ _error.reset(new WriteErrorDetail);
+ error.cloneTo(_error.get());
_error->setIndex(_itemRef.getItemIndex());
_state = WriteOpState_Error;
// No need to updateOpState, set directly