summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-06-05 21:05:13 +0300
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-06-08 08:17:26 +0300
commit6f8b26a49dd6fb150689ecd970f47a48e15bd28b (patch)
treebcb49fc3e81e39e7ca3d8135505c97e421e76794 /src
parentb2d6f747eb85d303d309f5990ba395eb9a23fa86 (diff)
downloadmongo-6f8b26a49dd6fb150689ecd970f47a48e15bd28b.tar.gz
SERVER-28753 Add support for TxnNumber/StmtId in write commands
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/logical_session_id.cpp5
-rw-r--r--src/mongo/db/logical_session_id.h6
-rw-r--r--src/mongo/db/logical_session_id.idl8
-rw-r--r--src/mongo/s/write_ops/SConscript197
-rw-r--r--src/mongo/s/write_ops/batched_command_request.cpp63
-rw-r--r--src/mongo/s/write_ops/batched_command_request.h19
-rw-r--r--src/mongo/s/write_ops/write_ops.idl44
7 files changed, 227 insertions, 115 deletions
diff --git a/src/mongo/db/logical_session_id.cpp b/src/mongo/db/logical_session_id.cpp
index c26b8753b90..497036a9d6c 100644
--- a/src/mongo/db/logical_session_id.cpp
+++ b/src/mongo/db/logical_session_id.cpp
@@ -48,11 +48,6 @@ LogicalSessionId LogicalSessionId::gen() {
return {UUID::gen()};
}
-StatusWith<LogicalSessionId> LogicalSessionId::parse(const TxnId& txnId) {
- // TODO: the TxnId class is not yet implemented.
- MONGO_UNREACHABLE;
-}
-
StatusWith<LogicalSessionId> LogicalSessionId::parse(const std::string& s) {
auto res = UUID::parse(s);
if (!res.isOK()) {
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h
index 85c3fb45383..eca1d8a33f5 100644
--- a/src/mongo/db/logical_session_id.h
+++ b/src/mongo/db/logical_session_id.h
@@ -37,7 +37,6 @@
namespace mongo {
class BSONObjBuilder;
-class TxnId;
/**
* A 128-bit identifier for a logical session.
@@ -53,11 +52,6 @@ public:
static LogicalSessionId gen();
/**
- * Construct a new LogicalSessionId out of a txnId received with an operation.
- */
- static StatusWith<LogicalSessionId> parse(const TxnId& txnId);
-
- /**
* If the given string represents a valid LogicalSessionId, constructs and returns,
* the id, otherwise returns an error.
*/
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl
index 6e04d948ca7..a6963dd6e8d 100644
--- a/src/mongo/db/logical_session_id.idl
+++ b/src/mongo/db/logical_session_id.idl
@@ -34,6 +34,14 @@ types:
deserializer: "mongo::UUID::parse"
serializer: "mongo::UUID::toBSON"
+ TxnNumber:
+ description: "A strictly-increasing per-session counter, which indicates to which transaction
+ of a given session does the specified command belong. The combination of
+ LogicalSessionId:TxnNumber is referred to as the transaction identifier."
+ bson_serialization_type: long
+ cpp_type: "std::int64_t"
+ deserializer: "mongo::BSONElement::_numberLong"
+
structs:
logical_session_id:
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index b9f701f7b77..69df543a930 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -1,98 +1,99 @@
-# -*- mode: python -*-
-
-Import("env")
-
-env = env.Clone()
-
-env.Library(
- target='batch_write_types',
- source=[
- 'batched_command_request.cpp',
- 'batched_command_response.cpp',
- 'batched_delete_request.cpp',
- 'batched_delete_document.cpp',
- 'batched_insert_request.cpp',
- 'batched_update_request.cpp',
- 'batched_update_document.cpp',
- 'batched_upsert_detail.cpp',
- 'write_error_detail.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/repl/optime',
- '$BUILD_DIR/mongo/s/common',
- ],
-)
-
-env.Library(
- target='cluster_write_op',
- source=[
- 'write_op.cpp',
- 'batch_write_op.cpp',
- 'batch_write_exec.cpp',
- ],
- LIBDEPS=[
- 'batch_write_types',
- '$BUILD_DIR/mongo/client/connection_string',
- '$BUILD_DIR/mongo/s/async_requests_sender',
- '$BUILD_DIR/mongo/s/client/sharding_client',
- '$BUILD_DIR/mongo/s/coreshard',
- ],
-)
-
-env.Library(
- target='cluster_write_op_conversion',
- source=[
- 'batch_upconvert.cpp',
- 'batch_downconvert.cpp',
- ],
- LIBDEPS=[
- 'cluster_write_op',
- '$BUILD_DIR/mongo/db/dbmessage',
- '$BUILD_DIR/mongo/db/lasterror',
- ],
-)
-
-env.CppUnitTest(
- target='batch_write_types_test',
- source=[
- 'batched_command_request_test.cpp',
- 'batched_command_response_test.cpp',
- 'batched_delete_request_test.cpp',
- 'batched_insert_request_test.cpp',
- 'batched_update_request_test.cpp',
- ],
- LIBDEPS=[
- 'batch_write_types',
- ]
-)
-
-env.CppUnitTest(
- target='cluster_write_op_test',
- source=[
- 'write_op_test.cpp',
- 'batch_write_op_test.cpp',
- 'batch_write_exec_test.cpp',
- ],
- LIBDEPS=[
- 'cluster_write_op',
- '$BUILD_DIR/mongo/db/range_arithmetic',
- '$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/s/sharding_test_fixture',
- ]
-)
-
-env.CppUnitTest(
- target='cluster_write_op_conversion_test',
- source=[
- 'batch_upconvert_test.cpp',
- 'batch_downconvert_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- 'cluster_write_op',
- 'cluster_write_op_conversion',
- ]
-)
+# -*- mode: python -*-
+
+Import("env")
+
+env = env.Clone()
+
+env.Library(
+ target='batch_write_types',
+ source=[
+ 'batched_command_request.cpp',
+ 'batched_command_response.cpp',
+ 'batched_delete_document.cpp',
+ 'batched_delete_request.cpp',
+ 'batched_insert_request.cpp',
+ 'batched_update_document.cpp',
+ 'batched_update_request.cpp',
+ 'batched_upsert_detail.cpp',
+ 'write_error_detail.cpp',
+ env.Idlc('write_ops.idl')[0],
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/db/repl/optime',
+ '$BUILD_DIR/mongo/s/common',
+ ],
+)
+
+env.Library(
+ target='cluster_write_op',
+ source=[
+ 'write_op.cpp',
+ 'batch_write_op.cpp',
+ 'batch_write_exec.cpp',
+ ],
+ LIBDEPS=[
+ 'batch_write_types',
+ '$BUILD_DIR/mongo/client/connection_string',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
+ '$BUILD_DIR/mongo/s/client/sharding_client',
+ '$BUILD_DIR/mongo/s/coreshard',
+ ],
+)
+
+env.Library(
+ target='cluster_write_op_conversion',
+ source=[
+ 'batch_upconvert.cpp',
+ 'batch_downconvert.cpp',
+ ],
+ LIBDEPS=[
+ 'cluster_write_op',
+ '$BUILD_DIR/mongo/db/dbmessage',
+ '$BUILD_DIR/mongo/db/lasterror',
+ ],
+)
+
+env.CppUnitTest(
+ target='batch_write_types_test',
+ source=[
+ 'batched_command_request_test.cpp',
+ 'batched_command_response_test.cpp',
+ 'batched_delete_request_test.cpp',
+ 'batched_insert_request_test.cpp',
+ 'batched_update_request_test.cpp',
+ ],
+ LIBDEPS=[
+ 'batch_write_types',
+ ]
+)
+
+env.CppUnitTest(
+ target='cluster_write_op_test',
+ source=[
+ 'write_op_test.cpp',
+ 'batch_write_op_test.cpp',
+ 'batch_write_exec_test.cpp',
+ ],
+ LIBDEPS=[
+ 'cluster_write_op',
+ '$BUILD_DIR/mongo/db/range_arithmetic',
+ '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/s/sharding_test_fixture',
+ ]
+)
+
+env.CppUnitTest(
+ target='cluster_write_op_conversion_test',
+ source=[
+ 'batch_upconvert_test.cpp',
+ 'batch_downconvert_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ 'cluster_write_op',
+ 'cluster_write_op_conversion',
+ ]
+)
diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp
index 62bfa5a40b1..e4811ffe763 100644
--- a/src/mongo/s/write_ops/batched_command_request.cpp
+++ b/src/mongo/s/write_ops/batched_command_request.cpp
@@ -159,6 +159,9 @@ BSONObj BatchedCommandRequest::toBSON() const {
_shardVersion.get().appendForCommands(&builder);
}
+ // Append the transaction info
+ _txnInfo.serialize(&builder);
+
return builder.obj();
}
@@ -184,17 +187,33 @@ bool BatchedCommandRequest::parseBSON(StringData dbName,
if (!succeeded)
return false;
- // Now parse out the chunk version and optime.
+ // Parse the command's shard version
auto chunkVersion = ChunkVersion::parseFromBSONForCommands(source);
if (chunkVersion.isOK()) {
_shardVersion = chunkVersion.getValue();
- return true;
- } else if (chunkVersion == ErrorCodes::NoSuchKey) {
- return true;
+ } else if (chunkVersion != ErrorCodes::NoSuchKey) {
+ *errMsg = chunkVersion.getStatus().toString();
+ return false;
}
- *errMsg = causedBy(chunkVersion.getStatus());
- return false;
+ // Parse the command's transaction info and do extra validation not done by the parser
+ try {
+ _txnInfo = WriteOpTxnInfo::parse(IDLParserErrorContext("WriteOpTxnInfo"), source);
+
+ const auto& stmtIds = _txnInfo.getStmtIds();
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "The size of the statement ids array (" << stmtIds->size()
+ << ") does not match the number of operations ("
+ << sizeWriteOps()
+ << ")",
+ !stmtIds || stmtIds->size() == sizeWriteOps());
+ } catch (const DBException& ex) {
+ *errMsg = str::stream() << "Failed to parse the write op retriability information due to "
+ << ex.toString();
+ return false;
+ }
+
+ return true;
}
std::string BatchedCommandRequest::toString() const {
@@ -369,4 +388,36 @@ bool BatchedCommandRequest::getIndexedNS(const BSONObj& writeCmdObj,
return true;
}
+const boost::optional<std::int64_t> BatchedCommandRequest::getTxnNum() const& {
+ return _txnInfo.getTxnNum();
+}
+
+void BatchedCommandRequest::setTxnNum(boost::optional<std::int64_t> value) {
+ _txnInfo.setTxnNum(std::move(value));
+}
+
+const boost::optional<std::vector<std::int32_t>> BatchedCommandRequest::getStmtIds() const& {
+ return _txnInfo.getStmtIds();
+}
+
+void BatchedCommandRequest::setStmtIds(boost::optional<std::vector<std::int32_t>> value) {
+ invariant(_txnInfo.getTxnNum());
+ invariant(!value || value->size() == sizeWriteOps());
+
+ _txnInfo.setStmtIds(std::move(value));
+}
+
+int32_t BatchedCommandRequest::getStmtIdForWriteAt(size_t writePos) const {
+ invariant(getTxnNum());
+
+ const auto& stmtIds = _txnInfo.getStmtIds();
+
+ if (stmtIds) {
+ return stmtIds->at(writePos);
+ }
+
+ const int32_t kFirstStmtId = 0;
+ return kFirstStmtId + writePos;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h
index c69a3516276..036f9362494 100644
--- a/src/mongo/s/write_ops/batched_command_request.h
+++ b/src/mongo/s/write_ops/batched_command_request.h
@@ -35,6 +35,7 @@
#include "mongo/s/write_ops/batched_delete_request.h"
#include "mongo/s/write_ops/batched_insert_request.h"
#include "mongo/s/write_ops/batched_update_request.h"
+#include "mongo/s/write_ops/write_ops_gen.h"
namespace mongo {
@@ -135,6 +136,21 @@ public:
return _shardVersion.get();
}
+ const boost::optional<std::int64_t> getTxnNum() const&;
+ void setTxnNum(boost::optional<std::int64_t> value);
+
+ const boost::optional<std::vector<std::int32_t>> getStmtIds() const&;
+ void setStmtIds(boost::optional<std::vector<std::int32_t>> value);
+
+ /**
+ * Retrieves the statement id for the write at the specified position in the write batch entries
+ * array.
+ *
+ * This method may only be called if a TxnNumber has been given for the operation, otherwise it
+ * will fassert.
+ */
+ int32_t getStmtIdForWriteAt(size_t writePos) const;
+
void setShouldBypassValidation(bool newVal);
bool shouldBypassValidation() const;
@@ -181,6 +197,9 @@ private:
std::unique_ptr<BatchedInsertRequest> _insertReq;
std::unique_ptr<BatchedUpdateRequest> _updateReq;
std::unique_ptr<BatchedDeleteRequest> _deleteReq;
+
+ // If this write is retriable, contains information about the retriability of the write
+ WriteOpTxnInfo _txnInfo;
};
/**
diff --git a/src/mongo/s/write_ops/write_ops.idl b/src/mongo/s/write_ops/write_ops.idl
new file mode 100644
index 00000000000..2b1ac434d60
--- /dev/null
+++ b/src/mongo/s/write_ops/write_ops.idl
@@ -0,0 +1,44 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/db/logical_session_id.idl"
+
+structs:
+
+ WriteOpTxnInfo:
+ description: "Contains a unique identifier information for a particular write operation,
+ so it can be retried idempotently."
+ fields:
+ txnNum:
+ description: "The transaction number relative to the session in which a particular
+ write operation executes"
+ type: TxnNumber
+ optional: true
+
+ stmtIds:
+ description: "An array of statement numbers relative to the transaction. If this
+ field is set, its size must be exactly the same as the number of
+ entries in the corresponding insert/update/delete request. If it is
+ not set, the statement ids of the contained operation will be
+ implicitly generated based on their offset, starting from 0."
+ type: array<int>
+ optional: true
+ # Ignore any extra fields, because this is a parser only for a subset of the write
+ # commands' content
+ strict: false