From 9feeb79140ed10e3a7f2036491fc07573740c231 Mon Sep 17 00:00:00 2001 From: Tincu Gabriel Date: Wed, 2 Dec 2020 15:45:13 +0100 Subject: Core Protocol: Add support for flexible versions (#2151) - Add support for new request and response headers, supporting flexible versions / tagged fields - Add List / Alter partition reassignments APIs - Add support for varints - Add support for compact collections (byte array, string, array) --- kafka/protocol/admin.py | 91 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) (limited to 'kafka/protocol/admin.py') diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 63a3327..f9d61e5 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64 +from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields class ApiVersionResponse_v0(Response): @@ -963,3 +963,92 @@ DescribeClientQuotasRequest = [ DescribeClientQuotasResponse = [ DescribeClientQuotasResponse_v0, ] + + +class AlterPartitionReassignmentsResponse_v0(Response): + API_KEY = 45 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ("error_code", Int16), + ("error_message", CompactString("utf-8")), + ("responses", CompactArray( + ("name", CompactString("utf-8")), + ("partitions", CompactArray( + ("partition_index", Int32), + ("error_code", Int16), + ("error_message", CompactString("utf-8")), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + ) + + +class AlterPartitionReassignmentsRequest_v0(Request): + FLEXIBLE_VERSION = True + API_KEY = 45 + API_VERSION = 0 + RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0 + SCHEMA = Schema( + ("timeout_ms", Int32), + ("topics", CompactArray( + ("name", CompactString("utf-8")), + ("partitions", CompactArray( + ("partition_index", Int32), + ("replicas", CompactArray(Int32)), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + ) + + +AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0] + +AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0] + + +class ListPartitionReassignmentsResponse_v0(Response): + API_KEY = 46 + API_VERSION = 0 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ("error_code", Int16), + ("error_message", CompactString("utf-8")), + ("topics", CompactArray( + ("name", CompactString("utf-8")), + ("partitions", CompactArray( + ("partition_index", Int32), + ("replicas", CompactArray(Int32)), + ("adding_replicas", CompactArray(Int32)), + ("removing_replicas", CompactArray(Int32)), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + ) + + +class ListPartitionReassignmentsRequest_v0(Request): + FLEXIBLE_VERSION = True + API_KEY = 46 + API_VERSION = 0 + RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0 + SCHEMA = Schema( + ("timeout_ms", Int32), + ("topics", CompactArray( + ("name", CompactString("utf-8")), + ("partition_index", CompactArray(Int32)), + ("tags", TaggedFields) + )), + ("tags", TaggedFields) + ) + + +ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] + +ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] -- cgit v1.2.1