diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-06-05 21:05:13 +0300 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-06-08 08:17:26 +0300 |
commit | 6f8b26a49dd6fb150689ecd970f47a48e15bd28b (patch) | |
tree | bcb49fc3e81e39e7ca3d8135505c97e421e76794 /src | |
parent | b2d6f747eb85d303d309f5990ba395eb9a23fa86 (diff) | |
download | mongo-6f8b26a49dd6fb150689ecd970f47a48e15bd28b.tar.gz |
SERVER-28753 Add support for TxnNumber/StmtId in write commands
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/logical_session_id.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/logical_session_id.h | 6 | ||||
-rw-r--r-- | src/mongo/db/logical_session_id.idl | 8 | ||||
-rw-r--r-- | src/mongo/s/write_ops/SConscript | 197 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_request.cpp | 63 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_request.h | 19 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_ops.idl | 44 |
7 files changed, 227 insertions, 115 deletions
diff --git a/src/mongo/db/logical_session_id.cpp b/src/mongo/db/logical_session_id.cpp index c26b8753b90..497036a9d6c 100644 --- a/src/mongo/db/logical_session_id.cpp +++ b/src/mongo/db/logical_session_id.cpp @@ -48,11 +48,6 @@ LogicalSessionId LogicalSessionId::gen() { return {UUID::gen()}; } -StatusWith<LogicalSessionId> LogicalSessionId::parse(const TxnId& txnId) { - // TODO: the TxnId class is not yet implemented. - MONGO_UNREACHABLE; -} - StatusWith<LogicalSessionId> LogicalSessionId::parse(const std::string& s) { auto res = UUID::parse(s); if (!res.isOK()) { diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 85c3fb45383..eca1d8a33f5 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -37,7 +37,6 @@ namespace mongo { class BSONObjBuilder; -class TxnId; /** * A 128-bit identifier for a logical session. @@ -53,11 +52,6 @@ public: static LogicalSessionId gen(); /** - * Construct a new LogicalSessionId out of a txnId received with an operation. - */ - static StatusWith<LogicalSessionId> parse(const TxnId& txnId); - - /** * If the given string represents a valid LogicalSessionId, constructs and returns, * the id, otherwise returns an error. */ diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index 6e04d948ca7..a6963dd6e8d 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -34,6 +34,14 @@ types: deserializer: "mongo::UUID::parse" serializer: "mongo::UUID::toBSON" + TxnNumber: + description: "A strictly-increasing per-session counter, which indicates to which transaction + of a given session does the specified command belong. The combination of + LogicalSessionId:TxnNumber is referred to as the transaction identifier." + bson_serialization_type: long + cpp_type: "std::int64_t" + deserializer: "mongo::BSONElement::_numberLong" + structs: logical_session_id: diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index b9f701f7b77..69df543a930 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -1,98 +1,99 @@ -# -*- mode: python -*-
-
-Import("env")
-
-env = env.Clone()
-
-env.Library(
- target='batch_write_types',
- source=[
- 'batched_command_request.cpp',
- 'batched_command_response.cpp',
- 'batched_delete_request.cpp',
- 'batched_delete_document.cpp',
- 'batched_insert_request.cpp',
- 'batched_update_request.cpp',
- 'batched_update_document.cpp',
- 'batched_upsert_detail.cpp',
- 'write_error_detail.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/repl/optime',
- '$BUILD_DIR/mongo/s/common',
- ],
-)
-
-env.Library(
- target='cluster_write_op',
- source=[
- 'write_op.cpp',
- 'batch_write_op.cpp',
- 'batch_write_exec.cpp',
- ],
- LIBDEPS=[
- 'batch_write_types',
- '$BUILD_DIR/mongo/client/connection_string',
- '$BUILD_DIR/mongo/s/async_requests_sender',
- '$BUILD_DIR/mongo/s/client/sharding_client',
- '$BUILD_DIR/mongo/s/coreshard',
- ],
-)
-
-env.Library(
- target='cluster_write_op_conversion',
- source=[
- 'batch_upconvert.cpp',
- 'batch_downconvert.cpp',
- ],
- LIBDEPS=[
- 'cluster_write_op',
- '$BUILD_DIR/mongo/db/dbmessage',
- '$BUILD_DIR/mongo/db/lasterror',
- ],
-)
-
-env.CppUnitTest(
- target='batch_write_types_test',
- source=[
- 'batched_command_request_test.cpp',
- 'batched_command_response_test.cpp',
- 'batched_delete_request_test.cpp',
- 'batched_insert_request_test.cpp',
- 'batched_update_request_test.cpp',
- ],
- LIBDEPS=[
- 'batch_write_types',
- ]
-)
-
-env.CppUnitTest(
- target='cluster_write_op_test',
- source=[
- 'write_op_test.cpp',
- 'batch_write_op_test.cpp',
- 'batch_write_exec_test.cpp',
- ],
- LIBDEPS=[
- 'cluster_write_op',
- '$BUILD_DIR/mongo/db/range_arithmetic',
- '$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/s/sharding_test_fixture',
- ]
-)
-
-env.CppUnitTest(
- target='cluster_write_op_conversion_test',
- source=[
- 'batch_upconvert_test.cpp',
- 'batch_downconvert_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- 'cluster_write_op',
- 'cluster_write_op_conversion',
- ]
-)
+# -*- mode: python -*- + +Import("env") + +env = env.Clone() + +env.Library( + target='batch_write_types', + source=[ + 'batched_command_request.cpp', + 'batched_command_response.cpp', + 'batched_delete_document.cpp', + 'batched_delete_request.cpp', + 'batched_insert_request.cpp', + 'batched_update_document.cpp', + 'batched_update_request.cpp', + 'batched_upsert_detail.cpp', + 'write_error_detail.cpp', + env.Idlc('write_ops.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/repl/optime', + '$BUILD_DIR/mongo/s/common', + ], +) + +env.Library( + target='cluster_write_op', + source=[ + 'write_op.cpp', + 'batch_write_op.cpp', + 'batch_write_exec.cpp', + ], + LIBDEPS=[ + 'batch_write_types', + '$BUILD_DIR/mongo/client/connection_string', + '$BUILD_DIR/mongo/s/async_requests_sender', + '$BUILD_DIR/mongo/s/client/sharding_client', + '$BUILD_DIR/mongo/s/coreshard', + ], +) + +env.Library( + target='cluster_write_op_conversion', + source=[ + 'batch_upconvert.cpp', + 'batch_downconvert.cpp', + ], + LIBDEPS=[ + 'cluster_write_op', + '$BUILD_DIR/mongo/db/dbmessage', + '$BUILD_DIR/mongo/db/lasterror', + ], +) + +env.CppUnitTest( + target='batch_write_types_test', + source=[ + 'batched_command_request_test.cpp', + 'batched_command_response_test.cpp', + 'batched_delete_request_test.cpp', + 'batched_insert_request_test.cpp', + 'batched_update_request_test.cpp', + ], + LIBDEPS=[ + 'batch_write_types', + ] +) + +env.CppUnitTest( + target='cluster_write_op_test', + source=[ + 'write_op_test.cpp', + 'batch_write_op_test.cpp', + 'batch_write_exec_test.cpp', + ], + LIBDEPS=[ + 'cluster_write_op', + '$BUILD_DIR/mongo/db/range_arithmetic', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/s/sharding_test_fixture', + ] +) + +env.CppUnitTest( + target='cluster_write_op_conversion_test', + source=[ + 'batch_upconvert_test.cpp', + 'batch_downconvert_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/service_context_noop_init', + 'cluster_write_op', + 'cluster_write_op_conversion', + ] +) diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp index 62bfa5a40b1..e4811ffe763 100644 --- a/src/mongo/s/write_ops/batched_command_request.cpp +++ b/src/mongo/s/write_ops/batched_command_request.cpp @@ -159,6 +159,9 @@ BSONObj BatchedCommandRequest::toBSON() const { _shardVersion.get().appendForCommands(&builder); } + // Append the transaction info + _txnInfo.serialize(&builder); + return builder.obj(); } @@ -184,17 +187,33 @@ bool BatchedCommandRequest::parseBSON(StringData dbName, if (!succeeded) return false; - // Now parse out the chunk version and optime. + // Parse the command's shard version auto chunkVersion = ChunkVersion::parseFromBSONForCommands(source); if (chunkVersion.isOK()) { _shardVersion = chunkVersion.getValue(); - return true; - } else if (chunkVersion == ErrorCodes::NoSuchKey) { - return true; + } else if (chunkVersion != ErrorCodes::NoSuchKey) { + *errMsg = chunkVersion.getStatus().toString(); + return false; } - *errMsg = causedBy(chunkVersion.getStatus()); - return false; + // Parse the command's transaction info and do extra validation not done by the parser + try { + _txnInfo = WriteOpTxnInfo::parse(IDLParserErrorContext("WriteOpTxnInfo"), source); + + const auto& stmtIds = _txnInfo.getStmtIds(); + uassert(ErrorCodes::BadValue, + str::stream() << "The size of the statement ids array (" << stmtIds->size() + << ") does not match the number of operations (" + << sizeWriteOps() + << ")", + !stmtIds || stmtIds->size() == sizeWriteOps()); + } catch (const DBException& ex) { + *errMsg = str::stream() << "Failed to parse the write op retriability information due to " + << ex.toString(); + return false; + } + + return true; } std::string BatchedCommandRequest::toString() const { @@ -369,4 +388,36 @@ bool BatchedCommandRequest::getIndexedNS(const BSONObj& writeCmdObj, return true; } +const boost::optional<std::int64_t> BatchedCommandRequest::getTxnNum() const& { + return _txnInfo.getTxnNum(); +} + +void BatchedCommandRequest::setTxnNum(boost::optional<std::int64_t> value) { + _txnInfo.setTxnNum(std::move(value)); +} + +const boost::optional<std::vector<std::int32_t>> BatchedCommandRequest::getStmtIds() const& { + return _txnInfo.getStmtIds(); +} + +void BatchedCommandRequest::setStmtIds(boost::optional<std::vector<std::int32_t>> value) { + invariant(_txnInfo.getTxnNum()); + invariant(!value || value->size() == sizeWriteOps()); + + _txnInfo.setStmtIds(std::move(value)); +} + +int32_t BatchedCommandRequest::getStmtIdForWriteAt(size_t writePos) const { + invariant(getTxnNum()); + + const auto& stmtIds = _txnInfo.getStmtIds(); + + if (stmtIds) { + return stmtIds->at(writePos); + } + + const int32_t kFirstStmtId = 0; + return kFirstStmtId + writePos; +} + } // namespace mongo diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index c69a3516276..036f9362494 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -35,6 +35,7 @@ #include "mongo/s/write_ops/batched_delete_request.h" #include "mongo/s/write_ops/batched_insert_request.h" #include "mongo/s/write_ops/batched_update_request.h" +#include "mongo/s/write_ops/write_ops_gen.h" namespace mongo { @@ -135,6 +136,21 @@ public: return _shardVersion.get(); } + const boost::optional<std::int64_t> getTxnNum() const&; + void setTxnNum(boost::optional<std::int64_t> value); + + const boost::optional<std::vector<std::int32_t>> getStmtIds() const&; + void setStmtIds(boost::optional<std::vector<std::int32_t>> value); + + /** + * Retrieves the statement id for the write at the specified position in the write batch entries + * array. + * + * This method may only be called if a TxnNumber has been given for the operation, otherwise it + * will fassert. + */ + int32_t getStmtIdForWriteAt(size_t writePos) const; + void setShouldBypassValidation(bool newVal); bool shouldBypassValidation() const; @@ -181,6 +197,9 @@ private: std::unique_ptr<BatchedInsertRequest> _insertReq; std::unique_ptr<BatchedUpdateRequest> _updateReq; std::unique_ptr<BatchedDeleteRequest> _deleteReq; + + // If this write is retriable, contains information about the retriability of the write + WriteOpTxnInfo _txnInfo; }; /** diff --git a/src/mongo/s/write_ops/write_ops.idl b/src/mongo/s/write_ops/write_ops.idl new file mode 100644 index 00000000000..2b1ac434d60 --- /dev/null +++ b/src/mongo/s/write_ops/write_ops.idl @@ -0,0 +1,44 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/db/logical_session_id.idl" + +structs: + + WriteOpTxnInfo: + description: "Contains a unique identifier information for a particular write operation, + so it can be retried idempotently." + fields: + txnNum: + description: "The transaction number relative to the session in which a particular + write operation executes" + type: TxnNumber + optional: true + + stmtIds: + description: "An array of statement numbers relative to the transaction. If this + field is set, its size must be exactly the same as the number of + entries in the corresponding insert/update/delete request. If it is + not set, the statement ids of the contained operation will be + implicitly generated based on their offset, starting from 0." + type: array<int> + optional: true + # Ignore any extra fields, because this is a parser only for a subset of the write + # commands' content + strict: false |