diff options
author | Dana Powers <dana.powers@rd.io> | 2015-11-29 10:00:50 +0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:40 -0800 |
commit | 058567912e8d82c1da5e5ead9e30be532573a173 (patch) | |
tree | eb5cdd9d7c25729441cfa097772ca0623e4cbfe0 /kafka/protocol | |
parent | a85e09df89a43de5b659a0fa4ed35bec37c60e04 (diff) | |
download | kafka-python-058567912e8d82c1da5e5ead9e30be532573a173.tar.gz |
Add simple BrokerConnection class; add request.RESPONSE_TYPE class vars
Diffstat (limited to 'kafka/protocol')
-rw-r--r-- | kafka/protocol/commit.py | 42 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 24 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 17 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 23 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 17 |
5 files changed, 65 insertions, 58 deletions
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 5ba0227..2955de1 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -2,9 +2,20 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + class OffsetCommitRequest_v2(Struct): API_KEY = 8 API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -22,6 +33,7 @@ class OffsetCommitRequest_v2(Struct): class OffsetCommitRequest_v1(Struct): API_KEY = 8 API_VERSION = 1 # Kafka-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), @@ -39,6 +51,7 @@ class OffsetCommitRequest_v1(Struct): class OffsetCommitRequest_v0(Struct): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -50,12 +63,14 @@ class OffsetCommitRequest_v0(Struct): ) -class OffsetCommitResponse(Struct): +class OffsetFetchResponse(Struct): SCHEMA = Schema( ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), ('error_code', Int16))))) ) @@ -63,6 +78,7 @@ class OffsetCommitResponse(Struct): class OffsetFetchRequest_v1(Struct): API_KEY = 9 API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -74,6 +90,7 @@ class OffsetFetchRequest_v1(Struct): class OffsetFetchRequest_v0(Struct): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage + RESPONSE_TYPE = OffsetFetchResponse SCHEMA = Schema( ('consumer_group', String('utf-8')), ('topics', Array( @@ -82,30 +99,19 @@ class OffsetFetchRequest_v0(Struct): ) -class OffsetFetchResponse(Struct): +class GroupCoordinatorResponse(Struct): SCHEMA = Schema( - ('topics', Array( - ('topic', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('offset', Int64), - ('metadata', String('utf-8')), - ('error_code', Int16))))) + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) ) class GroupCoordinatorRequest(Struct): API_KEY = 10 API_VERSION = 0 + RESPONSE_TYPE = GroupCoordinatorResponse SCHEMA = Schema( ('consumer_group', String('utf-8')) ) - - -class GroupCoordinatorResponse(Struct): - SCHEMA = Schema( - ('error_code', Int16), - ('coordinator_id', Int32), - ('host', String('utf-8')), - ('port', Int32) - ) diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index c6d60cc..e00c9ab 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -3,9 +3,22 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String +class FetchResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('message_set', MessageSet))))) + ) + + class FetchRequest(Struct): API_KEY = 1 API_VERSION = 0 + RESPONSE_TYPE = FetchResponse SCHEMA = Schema( ('replica_id', Int32), ('max_wait_time', Int32), @@ -17,14 +30,3 @@ class FetchRequest(Struct): ('offset', Int64), ('max_bytes', Int32))))) ) - -class FetchResponse(Struct): - SCHEMA = Schema( - ('topics', Array( - ('topics', String('utf-8')), - ('partitions', Array( - ('partition', Int32), - ('error_code', Int16), - ('highwater_offset', Int64), - ('message_set', MessageSet))))) - ) diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index b35e7ef..810f1b8 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -2,14 +2,6 @@ from .struct import Struct from .types import Array, Int16, Int32, Schema, String -class MetadataRequest(Struct): - API_KEY = 3 - API_VERSION = 0 - SCHEMA = Schema( - ('topics', Array(String('utf-8'))) - ) - - class MetadataResponse(Struct): SCHEMA = Schema( ('brokers', Array( @@ -26,3 +18,12 @@ class MetadataResponse(Struct): ('replicas', Array(Int32)), ('isr', Array(Int32)))))) ) + + +class MetadataRequest(Struct): + API_KEY = 3 + API_VERSION = 0 + RESPONSE_TYPE = MetadataResponse + SCHEMA = Schema( + ('topics', Array(String('utf-8'))) + ) diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 942bdbf..776de39 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -2,31 +2,30 @@ from .struct import Struct from .types import Array, Int16, Int32, Int64, Schema, String -class OffsetRequest(Struct): - API_KEY = 2 - API_VERSION = 0 +class OffsetResponse(Struct): SCHEMA = Schema( - ('replica_id', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('time', Int64), - ('max_offsets', Int32))))) + ('error_code', Int16), + ('offsets', Array(Int64)))))) ) - DEFAULTS = { - 'replica_id': -1 - } -class OffsetResponse(Struct): +class OffsetRequest(Struct): API_KEY = 2 API_VERSION = 0 + RESPONSE_TYPE = OffsetResponse SCHEMA = Schema( + ('replica_id', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('error_code', Int16), - ('offsets', Array(Int64)))))) + ('time', Int64), + ('max_offsets', Int32))))) ) + DEFAULTS = { + 'replica_id': -1 + } diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 532a702..ef2f96e 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -3,28 +3,27 @@ from .struct import Struct from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema -class ProduceRequest(Struct): - API_KEY = 0 - API_VERSION = 0 +class ProduceResponse(Struct): SCHEMA = Schema( - ('required_acks', Int16), - ('timeout', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', MessageSet))))) + ('error_code', Int16), + ('offset', Int64))))) ) -class ProduceResponse(Struct): +class ProduceRequest(Struct): API_KEY = 0 API_VERSION = 0 + RESPONSE_TYPE = ProduceResponse SCHEMA = Schema( + ('required_acks', Int16), + ('timeout', Int32), ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('error_code', Int16), - ('offset', Int64))))) + ('messages', MessageSet))))) ) |