summaryrefslogtreecommitdiff
path: root/kafka/producer/future.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 00:31:16 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 09:51:37 -0700
commit795cb9b29fa05d4425f807f54dfa639c125fc0dd (patch)
tree7fba03e95f26185c126aa95d1acdd2af5d2ad925 /kafka/producer/future.py
parent7f4a9361ea168a0e1073801d0b86868de47d1de2 (diff)
downloadkafka-python-795cb9b29fa05d4425f807f54dfa639c125fc0dd.tar.gz
KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets
Diffstat (limited to 'kafka/producer/future.py')
-rw-r--r--kafka/producer/future.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 35520d8..acf4255 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -29,16 +29,21 @@ class FutureProduceResult(Future):
class FutureRecordMetadata(Future):
- def __init__(self, produce_future, relative_offset):
+ def __init__(self, produce_future, relative_offset, timestamp_ms):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
self.relative_offset = relative_offset
+ self.timestamp_ms = timestamp_ms
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
- def _produce_success(self, base_offset):
+ def _produce_success(self, offset_and_timestamp):
+ base_offset, timestamp_ms = offset_and_timestamp
+ if timestamp_ms is None:
+ timestamp_ms = self.timestamp_ms
self.success(RecordMetadata(self._produce_future.topic_partition,
- base_offset, self.relative_offset))
+ base_offset, timestamp_ms,
+ self.relative_offset))
def get(self, timeout=None):
if not self.is_done and not self._produce_future.await(timeout):
@@ -51,12 +56,13 @@ class FutureRecordMetadata(Future):
class RecordMetadata(collections.namedtuple(
- 'RecordMetadata', 'topic partition topic_partition offset')):
- def __new__(cls, tp, base_offset, relative_offset=None):
+ 'RecordMetadata', 'topic partition topic_partition offset timestamp')):
+ def __new__(cls, tp, base_offset, timestamp, relative_offset=None):
offset = base_offset
if relative_offset is not None and base_offset != -1:
offset += relative_offset
- return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset)
+ return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition,
+ tp, offset, timestamp)
def __str__(self):
return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % (