diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-01-26 15:01:51 -0800 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-01-26 15:01:51 -0800 |
commit | 9ab8415ed75b08c5de9f823708027bb4f10a0643 (patch) | |
tree | f2011cb5cbdc4d5cd3d9bff9c52b35c2a6aca2ad | |
parent | f517ddf283a86947a15f95e5ec562e81f4c477e5 (diff) | |
parent | 587206ff6ad59ae01248d24ff9c9fadbdfc1c1fc (diff) | |
download | kafka-python-9ab8415ed75b08c5de9f823708027bb4f10a0643.tar.gz |
Merge branch 'master' of github.com:mumrah/kafka-python into validate_consumer_offset
Conflicts:
kafka/consumer/simple.py
35 files changed, 1652 insertions, 596 deletions
@@ -8,3 +8,4 @@ env servers/*/kafka-bin .coverage .noseids +docs/_build @@ -2,6 +2,8 @@ [](https://travis-ci.org/mumrah/kafka-python) +[Full documentation available on ReadTheDocs](http://kafka-python.readthedocs.org/en/latest/) + This module provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression @@ -32,233 +34,3 @@ Python versions - 2.7 (tested on 2.7.8) - pypy (tested on pypy 2.3.1 / python 2.7.6) - (Python 3.3 and 3.4 support has been added to trunk and will be available the next release) - -# Usage - -## High level - -```python -from kafka import KafkaClient, SimpleProducer, SimpleConsumer - -# To send messages synchronously -kafka = KafkaClient("localhost:9092") -producer = SimpleProducer(kafka) - -# Note that the application is responsible for encoding messages to type str -producer.send_messages("my-topic", "some message") -producer.send_messages("my-topic", "this method", "is variadic") - -# Send unicode message -producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) - -# To send messages asynchronously -# WARNING: current implementation does not guarantee message delivery on failure! -# messages can get dropped! Use at your own risk! Or help us improve with a PR! -producer = SimpleProducer(kafka, async=True) -producer.send_messages("my-topic", "async message") - -# To wait for acknowledgements -# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to -# a local log before sending response -# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed -# by all in sync replicas before sending a response -producer = SimpleProducer(kafka, async=False, - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000) - -response = producer.send_messages("my-topic", "another message") - -if response: - print(response[0].error) - print(response[0].offset) - -# To send messages in batch. You can use any of the available -# producers for doing this. The following producer will collect -# messages in batch and send them to Kafka after 20 messages are -# collected or every 60 seconds -# Notes: -# * If the producer dies before the messages are sent, there will be losses -# * Call producer.stop() to send the messages and cleanup -producer = SimpleProducer(kafka, batch_send=True, - batch_send_every_n=20, - batch_send_every_t=60) - -# To consume messages -consumer = SimpleConsumer(kafka, "my-group", "my-topic") -for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) - -kafka.close() -``` - -## Keyed messages -```python -from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner - -kafka = KafkaClient("localhost:9092") - -# HashedPartitioner is default -producer = KeyedProducer(kafka) -producer.send("my-topic", "key1", "some message") -producer.send("my-topic", "key2", "this methode") - -producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) -``` - -## Multiprocess consumer -```python -from kafka import KafkaClient, MultiProcessConsumer - -kafka = KafkaClient("localhost:9092") - -# This will split the number of partitions among two processes -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) - -# This will spawn processes such that each handles 2 partitions max -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", - partitions_per_proc=2) - -for message in consumer: - print(message) - -for message in consumer.get_messages(count=5, block=True, timeout=4): - print(message) -``` - -## Low level - -```python -from kafka import KafkaClient, create_message -from kafka.protocol import KafkaProtocol -from kafka.common import ProduceRequest - -kafka = KafkaClient("localhost:9092") - -req = ProduceRequest(topic="my-topic", partition=1, - messages=[create_message("some message")]) -resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) -kafka.close() - -resps[0].topic # "my-topic" -resps[0].partition # 1 -resps[0].error # 0 (hopefully) -resps[0].offset # offset of the first message sent in this request -``` - -# Install - -Install with your favorite package manager - -## Latest Release -Pip: - -```shell -pip install kafka-python -``` - -Releases are also listed at https://github.com/mumrah/kafka-python/releases - - -## Bleeding-Edge -```shell -git clone https://github.com/mumrah/kafka-python -pip install ./kafka-python -``` - -Setuptools: -```shell -git clone https://github.com/mumrah/kafka-python -easy_install ./kafka-python -``` - -Using `setup.py` directly: -```shell -git clone https://github.com/mumrah/kafka-python -cd kafka-python -python setup.py install -``` - -## Optional Snappy install - -### Install Development Libraries -Download and build Snappy from http://code.google.com/p/snappy/downloads/list - -Ubuntu: -```shell -apt-get install libsnappy-dev -``` - -OSX: -```shell -brew install snappy -``` - -From Source: -```shell -wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz -tar xzvf snappy-1.0.5.tar.gz -cd snappy-1.0.5 -./configure -make -sudo make install -``` - -### Install Python Module -Install the `python-snappy` module -```shell -pip install python-snappy -``` - -# Tests - -## Run the unit tests - -```shell -tox -``` - -## Run a subset of unit tests -```shell -# run protocol tests only -tox -- -v test.test_protocol -``` - -```shell -# test with pypy only -tox -e pypy -``` - -```shell -# Run only 1 test, and use python 2.7 -tox -e py27 -- -v --with-id --collect-only -# pick a test number from the list like #102 -tox -e py27 -- -v --with-id 102 -``` - -## Run the integration tests - -The integration tests will actually start up real local Zookeeper -instance and Kafka brokers, and send messages in using the client. - -First, get the kafka binaries for integration testing: -```shell -./build_integration.sh -``` -By default, the build_integration.sh script will download binary -distributions for all supported kafka versions. -To test against the latest source build, set KAFKA_VERSION=trunk -and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) -```shell -SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh -``` - -Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` -env variable to the server build you want to use for testing: -```shell -KAFKA_VERSION=0.8.0 tox -KAFKA_VERSION=0.8.1 tox -KAFKA_VERSION=0.8.1.1 tox -KAFKA_VERSION=trunk tox -``` diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..5751f68 --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,177 @@ +# Makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +PAPER = +BUILDDIR = _build + +# User-friendly check for sphinx-build +ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) +$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/) +endif + +# Internal variables. +PAPEROPT_a4 = -D latex_paper_size=a4 +PAPEROPT_letter = -D latex_paper_size=letter +ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# the i18n builder cannot share the environment and doctrees with the others +I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . + +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext + +help: + @echo "Please use \`make <target>' where <target> is one of" + @echo " html to make standalone HTML files" + @echo " dirhtml to make HTML files named index.html in directories" + @echo " singlehtml to make a single large HTML file" + @echo " pickle to make pickle files" + @echo " json to make JSON files" + @echo " htmlhelp to make HTML files and a HTML help project" + @echo " qthelp to make HTML files and a qthelp project" + @echo " devhelp to make HTML files and a Devhelp project" + @echo " epub to make an epub" + @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" + @echo " latexpdf to make LaTeX files and run them through pdflatex" + @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" + @echo " text to make text files" + @echo " man to make manual pages" + @echo " texinfo to make Texinfo files" + @echo " info to make Texinfo files and run them through makeinfo" + @echo " gettext to make PO message catalogs" + @echo " changes to make an overview of all changed/added/deprecated items" + @echo " xml to make Docutils-native XML files" + @echo " pseudoxml to make pseudoxml-XML files for display purposes" + @echo " linkcheck to check all external links for integrity" + @echo " doctest to run all doctests embedded in the documentation (if enabled)" + +clean: + rm -rf $(BUILDDIR)/* + +html: + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." + +dirhtml: + $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." + +singlehtml: + $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml + @echo + @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." + +pickle: + $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle + @echo + @echo "Build finished; now you can process the pickle files." + +json: + $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json + @echo + @echo "Build finished; now you can process the JSON files." + +htmlhelp: + $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp + @echo + @echo "Build finished; now you can run HTML Help Workshop with the" \ + ".hhp project file in $(BUILDDIR)/htmlhelp." + +qthelp: + $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp + @echo + @echo "Build finished; now you can run "qcollectiongenerator" with the" \ + ".qhcp project file in $(BUILDDIR)/qthelp, like this:" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp" + @echo "To view the help file:" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc" + +devhelp: + $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp + @echo + @echo "Build finished." + @echo "To view the help file:" + @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python" + @echo "# devhelp" + +epub: + $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub + @echo + @echo "Build finished. The epub file is in $(BUILDDIR)/epub." + +latex: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo + @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." + @echo "Run \`make' in that directory to run these through (pdf)latex" \ + "(use \`make latexpdf' here to do that automatically)." + +latexpdf: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through pdflatex..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +latexpdfja: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through platex and dvipdfmx..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +text: + $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text + @echo + @echo "Build finished. The text files are in $(BUILDDIR)/text." + +man: + $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man + @echo + @echo "Build finished. The manual pages are in $(BUILDDIR)/man." + +texinfo: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo + @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." + @echo "Run \`make' in that directory to run these through makeinfo" \ + "(use \`make info' here to do that automatically)." + +info: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo "Running Texinfo files through makeinfo..." + make -C $(BUILDDIR)/texinfo info + @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." + +gettext: + $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale + @echo + @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." + +changes: + $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes + @echo + @echo "The overview file is in $(BUILDDIR)/changes." + +linkcheck: + $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck + @echo + @echo "Link check complete; look for any errors in the above output " \ + "or in $(BUILDDIR)/linkcheck/output.txt." + +doctest: + $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest + @echo "Testing of doctests in the sources finished, look at the " \ + "results in $(BUILDDIR)/doctest/output.txt." + +xml: + $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml + @echo + @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + +pseudoxml: + $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml + @echo + @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." diff --git a/docs/api_reference.rst b/docs/api_reference.rst new file mode 100644 index 0000000..5e178f1 --- /dev/null +++ b/docs/api_reference.rst @@ -0,0 +1,67 @@ +API Reference +============= + +kafka +----- +.. automodule:: kafka.client + :members: + +.. automodule:: kafka.codec + :members: + +.. automodule:: kafka.common + :members: + +.. automodule:: kafka.conn + :members: + +.. automodule:: kafka.context + :members: + +.. automodule:: kafka.protocol + :members: + +.. automodule:: kafka.queue + :members: + +.. automodule:: kafka.util + :members: + + +kafka.consumer +-------------- +.. automodule:: kafka.consumer.base + :members: + +.. automodule:: kafka.consumer.kafka + :members: + +.. automodule:: kafka.consumer.multiprocess + :members: + +.. automodule:: kafka.consumer.simple + :members: + + +kafka.partitioner +----------------- +.. automodule:: kafka.partitioner.base + :members: + +.. automodule:: kafka.partitioner.hashed + :members: + +.. automodule:: kafka.partitioner.roundrobin + :members: + + +kafka.producer +-------------- +.. automodule:: kafka.producer.base + :members: + +.. automodule:: kafka.producer.keyed + :members: + +.. automodule:: kafka.producer.simple + :members: diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..9e95f79 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,264 @@ +# -*- coding: utf-8 -*- +# +# kafka-python documentation build configuration file, created by +# sphinx-quickstart on Sun Jan 4 12:21:50 2015. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys +import os + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +#sys.path.insert(0, os.path.abspath('.')) + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.viewcode', + 'sphinxcontrib.napoleon', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'kafka-python' +copyright = u'2015, David Arthur' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +with open('../VERSION') as version_file: + version = version_file.read() + +# The full version, including alpha/beta/rc tags. +release = version + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built documents. +#keep_warnings = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# "<project> v<release> documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Add any extra paths that contain custom files (such as robots.txt or +# .htaccess) here, relative to this directory. These files are copied +# directly to the root of the documentation. +#html_extra_path = [] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a <link> tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'kafka-pythondoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { +# The paper size ('letterpaper' or 'a4paper'). +#'papersize': 'letterpaper', + +# The font size ('10pt', '11pt' or '12pt'). +#'pointsize': '10pt', + +# Additional stuff for the LaTeX preamble. +#'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + ('index', 'kafka-python.tex', u'kafka-python Documentation', + u'David Arthur', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'kafka-python', u'kafka-python Documentation', + [u'David Arthur'], 1) +] + +# If true, show URL addresses after external links. +#man_show_urls = False + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'kafka-python', u'kafka-python Documentation', + u'David Arthur', 'kafka-python', 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +#texinfo_appendices = [] + +# If false, no module index is generated. +#texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +#texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +#texinfo_no_detailmenu = False diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..82c3f57 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,59 @@ + +kafka-python +============ + +This module provides low-level protocol support for Apache Kafka as well as +high-level consumer and producer classes. Request batching is supported by the +protocol as well as broker-aware request routing. Gzip and Snappy compression +is also supported for message sets. + +http://kafka.apache.org/ + +On Freenode IRC at #kafka-python, as well as #apache-kafka + +For general discussion of kafka-client design and implementation (not python specific), +see https://groups.google.com/forum/m/#!forum/kafka-clients + +Status +------ + +The current stable version of this package is `0.9.2 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.2>`_ and is compatible with: + +Kafka broker versions + +* 0.8.0 +* 0.8.1 +* 0.8.1.1 + +Python versions + +* 2.6 (tested on 2.6.9) +* 2.7 (tested on 2.7.8) +* pypy (tested on pypy 2.3.1 / python 2.7.6) +* (Python 3.3 and 3.4 support has been added to trunk and will be available the next release) + +License +------- + +Copyright 2014, David Arthur under Apache License, v2.0. See `LICENSE <https://github.com/mumrah/kafka-python/blob/master/LICENSE>`_. + + +Contents +-------- + +.. toctree:: + :maxdepth: 2 + + install + tests + usage + api_reference + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + diff --git a/docs/install.rst b/docs/install.rst new file mode 100644 index 0000000..1dd6d4e --- /dev/null +++ b/docs/install.rst @@ -0,0 +1,79 @@ +Install +======= + +Install with your favorite package manager + +Latest Release +-------------- +Pip: + +.. code:: bash + + pip install kafka-python + +Releases are also listed at https://github.com/mumrah/kafka-python/releases + + +Bleeding-Edge +------------- + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + pip install ./kafka-python + +Setuptools: + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + easy_install ./kafka-python + +Using `setup.py` directly: + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + cd kafka-python + python setup.py install + + +Optional Snappy install +----------------------- + +Install Development Libraries +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Download and build Snappy from http://code.google.com/p/snappy/downloads/list + +Ubuntu: + +.. code:: bash + + apt-get install libsnappy-dev + +OSX: + +.. code:: bash + + brew install snappy + +From Source: + +.. code:: bash + + wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz + tar xzvf snappy-1.0.5.tar.gz + cd snappy-1.0.5 + ./configure + make + sudo make install + +Install Python Module +^^^^^^^^^^^^^^^^^^^^^ + +Install the `python-snappy` module + +.. code:: bash + + pip install python-snappy diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..2e9d7dc --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,242 @@ +@ECHO OFF
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set BUILDDIR=_build
+set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
+set I18NSPHINXOPTS=%SPHINXOPTS% .
+if NOT "%PAPER%" == "" (
+ set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
+ set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS%
+)
+
+if "%1" == "" goto help
+
+if "%1" == "help" (
+ :help
+ echo.Please use `make ^<target^>` where ^<target^> is one of
+ echo. html to make standalone HTML files
+ echo. dirhtml to make HTML files named index.html in directories
+ echo. singlehtml to make a single large HTML file
+ echo. pickle to make pickle files
+ echo. json to make JSON files
+ echo. htmlhelp to make HTML files and a HTML help project
+ echo. qthelp to make HTML files and a qthelp project
+ echo. devhelp to make HTML files and a Devhelp project
+ echo. epub to make an epub
+ echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
+ echo. text to make text files
+ echo. man to make manual pages
+ echo. texinfo to make Texinfo files
+ echo. gettext to make PO message catalogs
+ echo. changes to make an overview over all changed/added/deprecated items
+ echo. xml to make Docutils-native XML files
+ echo. pseudoxml to make pseudoxml-XML files for display purposes
+ echo. linkcheck to check all external links for integrity
+ echo. doctest to run all doctests embedded in the documentation if enabled
+ goto end
+)
+
+if "%1" == "clean" (
+ for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
+ del /q /s %BUILDDIR%\*
+ goto end
+)
+
+
+%SPHINXBUILD% 2> nul
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.http://sphinx-doc.org/
+ exit /b 1
+)
+
+if "%1" == "html" (
+ %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/html.
+ goto end
+)
+
+if "%1" == "dirhtml" (
+ %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
+ goto end
+)
+
+if "%1" == "singlehtml" (
+ %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
+ goto end
+)
+
+if "%1" == "pickle" (
+ %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the pickle files.
+ goto end
+)
+
+if "%1" == "json" (
+ %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the JSON files.
+ goto end
+)
+
+if "%1" == "htmlhelp" (
+ %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run HTML Help Workshop with the ^
+.hhp project file in %BUILDDIR%/htmlhelp.
+ goto end
+)
+
+if "%1" == "qthelp" (
+ %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run "qcollectiongenerator" with the ^
+.qhcp project file in %BUILDDIR%/qthelp, like this:
+ echo.^> qcollectiongenerator %BUILDDIR%\qthelp\kafka-python.qhcp
+ echo.To view the help file:
+ echo.^> assistant -collectionFile %BUILDDIR%\qthelp\kafka-python.ghc
+ goto end
+)
+
+if "%1" == "devhelp" (
+ %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished.
+ goto end
+)
+
+if "%1" == "epub" (
+ %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The epub file is in %BUILDDIR%/epub.
+ goto end
+)
+
+if "%1" == "latex" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdf" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdfja" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf-ja
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "text" (
+ %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The text files are in %BUILDDIR%/text.
+ goto end
+)
+
+if "%1" == "man" (
+ %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The manual pages are in %BUILDDIR%/man.
+ goto end
+)
+
+if "%1" == "texinfo" (
+ %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo.
+ goto end
+)
+
+if "%1" == "gettext" (
+ %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The message catalogs are in %BUILDDIR%/locale.
+ goto end
+)
+
+if "%1" == "changes" (
+ %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.The overview file is in %BUILDDIR%/changes.
+ goto end
+)
+
+if "%1" == "linkcheck" (
+ %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Link check complete; look for any errors in the above output ^
+or in %BUILDDIR%/linkcheck/output.txt.
+ goto end
+)
+
+if "%1" == "doctest" (
+ %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Testing of doctests in the sources finished, look at the ^
+results in %BUILDDIR%/doctest/output.txt.
+ goto end
+)
+
+if "%1" == "xml" (
+ %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The XML files are in %BUILDDIR%/xml.
+ goto end
+)
+
+if "%1" == "pseudoxml" (
+ %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml.
+ goto end
+)
+
+:end
diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..94c50d8 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,7 @@ +sphinx +sphinxcontrib-napoleon + +# Install kafka-python in editable mode +# This allows the sphinx autodoc module +# to load the Python modules and extract docstrings. +-e .. diff --git a/docs/tests.rst b/docs/tests.rst new file mode 100644 index 0000000..df9a3ef --- /dev/null +++ b/docs/tests.rst @@ -0,0 +1,59 @@ +Tests +===== + +Run the unit tests +------------------ + +.. code:: bash + + tox + + +Run a subset of unit tests +-------------------------- + +.. code:: bash + + # run protocol tests only + tox -- -v test.test_protocol + + # test with pypy only + tox -e pypy + + # Run only 1 test, and use python 2.7 + tox -e py27 -- -v --with-id --collect-only + + # pick a test number from the list like #102 + tox -e py27 -- -v --with-id 102 + + +Run the integration tests +------------------------- + +The integration tests will actually start up real local Zookeeper +instance and Kafka brokers, and send messages in using the client. + +First, get the kafka binaries for integration testing: + +.. code:: bash + + ./build_integration.sh + +By default, the build_integration.sh script will download binary +distributions for all supported kafka versions. +To test against the latest source build, set KAFKA_VERSION=trunk +and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) + +.. code:: bash + + SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh + +Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` +env variable to the server build you want to use for testing: + +.. code:: bash + + KAFKA_VERSION=0.8.0 tox + KAFKA_VERSION=0.8.1 tox + KAFKA_VERSION=0.8.1.1 tox + KAFKA_VERSION=trunk tox diff --git a/docs/usage.rst b/docs/usage.rst new file mode 100644 index 0000000..5f3fcea --- /dev/null +++ b/docs/usage.rst @@ -0,0 +1,122 @@ +Usage +===== + +High level +---------- + +.. code:: python + + from kafka import KafkaClient, SimpleProducer, SimpleConsumer + + # To send messages synchronously + kafka = KafkaClient("localhost:9092") + producer = SimpleProducer(kafka) + + # Note that the application is responsible for encoding messages to type str + producer.send_messages("my-topic", "some message") + producer.send_messages("my-topic", "this method", "is variadic") + + # Send unicode message + producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + + # To send messages asynchronously + # WARNING: current implementation does not guarantee message delivery on failure! + # messages can get dropped! Use at your own risk! Or help us improve with a PR! + producer = SimpleProducer(kafka, async=True) + producer.send_messages("my-topic", "async message") + + # To wait for acknowledgements + # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to + # a local log before sending response + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(kafka, async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000) + + response = producer.send_messages("my-topic", "another message") + + if response: + print(response[0].error) + print(response[0].offset) + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(kafka, batch_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + + # To consume messages + consumer = SimpleConsumer(kafka, "my-group", "my-topic") + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + kafka.close() + + +Keyed messages +-------------- + +.. code:: python + + from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner + + kafka = KafkaClient("localhost:9092") + + # HashedPartitioner is default + producer = KeyedProducer(kafka) + producer.send("my-topic", "key1", "some message") + producer.send("my-topic", "key2", "this methode") + + producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + + +Multiprocess consumer +--------------------- + +.. code:: python + + from kafka import KafkaClient, MultiProcessConsumer + + kafka = KafkaClient("localhost:9092") + + # This will split the number of partitions among two processes + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) + + # This will spawn processes such that each handles 2 partitions max + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", + partitions_per_proc=2) + + for message in consumer: + print(message) + + for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + +Low level +--------- + +.. code:: python + + from kafka import KafkaClient, create_message + from kafka.protocol import KafkaProtocol + from kafka.common import ProduceRequest + + kafka = KafkaClient("localhost:9092") + + req = ProduceRequest(topic="my-topic", partition=1, + messages=[create_message("some message")]) + resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) + kafka.close() + + resps[0].topic # "my-topic" + resps[0].partition # 1 + resps[0].error # 0 (hopefully) + resps[0].offset # offset of the first message sent in this request diff --git a/kafka/__init__.py b/kafka/__init__.py index 16b9094..8ccdb4c 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -19,5 +19,5 @@ __all__ = [ 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', - 'create_snappy_message' + 'create_snappy_message', 'KafkaConsumer', ] diff --git a/kafka/client.py b/kafka/client.py index bc3d853..7b04e71 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -11,7 +11,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, - NotLeaderForPartitionError) + NotLeaderForPartitionError, ReplicaNotAvailableError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -131,19 +131,21 @@ class KafkaClient(object): the leader broker for that partition using the supplied encode/decode functions - Params - ====== + Arguments: + payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute + encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes + The response objects must be object-like and have topic + and partition attributes + + Returns: - Return - ====== List of response objects in the same order as the supplied payloads """ @@ -285,9 +287,9 @@ class KafkaClient(object): This method should be called after receiving any error - @param: *topics (optional) - If a list of topics is provided, the metadata refresh will be limited - to the specified topics only. + Arguments: + *topics (optional): If a list of topics is provided, + the metadata refresh will be limited to the specified topics only. Exceptions: ---------- @@ -350,6 +352,11 @@ class KafkaClient(object): log.error('No leader for topic %s partition %d', topic, partition) self.topics_to_brokers[topic_part] = None continue + # If one of the replicas is unavailable -- ignore + # this error code is provided for admin purposes only + # we never talk to replicas, only the leader + except ReplicaNotAvailableError: + log.warning('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) # If Known Broker, topic_partition -> BrokerMetadata if leader in self.brokers: @@ -379,18 +386,16 @@ class KafkaClient(object): sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified - Params - ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we - encounter an API error? - callback: function, instead of returning the ProduceResponse, - first pass it through this function - - Return - ====== - list of ProduceResponse or callback(ProduceResponse), in the - order of input payloads + Arguments: + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function + + Returns: + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ encoder = functools.partial( diff --git a/kafka/common.py b/kafka/common.py index e4b3b1b..b7bb06c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -1,3 +1,5 @@ +import inspect +import sys from collections import namedtuple ############### @@ -79,9 +81,6 @@ class KafkaError(RuntimeError): class BrokerResponseError(KafkaError): pass -class NoError(BrokerResponseError): - errno = 0 - message = 'SUCCESS' class UnknownError(BrokerResponseError): errno = -1 @@ -201,27 +200,16 @@ class KafkaConfigurationError(KafkaError): pass -kafka_errors = { - -1 : UnknownError, - 0 : NoError, - 1 : OffsetOutOfRangeError, - 2 : InvalidMessageError, - 3 : UnknownTopicOrPartitionError, - 4 : InvalidFetchRequestError, - 5 : LeaderNotAvailableError, - 6 : NotLeaderForPartitionError, - 7 : RequestTimedOutError, - 8 : BrokerNotAvailableError, - 9 : ReplicaNotAvailableError, - 10 : MessageSizeTooLargeError, - 11 : StaleControllerEpochError, - 12 : OffsetMetadataTooLargeError, - 13 : StaleLeaderEpochCodeError, -} +def _iter_broker_errors(): + for name, obj in inspect.getmembers(sys.modules[__name__]): + if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: + yield obj -def check_error(response): - error = kafka_errors.get(response.error, UnknownError) - if error is not NoError: - raise error(response) +kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) + +def check_error(response): + if response.error: + error_class = kafka_errors.get(response.error, UnknownError) + raise error_class(response) diff --git a/kafka/conn.py b/kafka/conn.py index ddfee8b..30debec 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -47,10 +47,11 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. - host: the host name or IP address of a kafka broker - port: the port number the kafka broker is listening on - timeout: default 120. The socket timeout for sending and receiving data - in seconds. None means no timeout, so a request can block forever. + Arguments: + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() @@ -116,8 +117,10 @@ class KafkaConnection(local): def send(self, request_id, payload): """ Send a request to Kafka - param: request_id -- can be any int (used only for debug logging...) - param: payload -- an encoded kafka packet (see KafkaProtocol) + + Arguments:: + request_id (int): can be any int (used only for debug logging...) + payload: an encoded kafka packet (see KafkaProtocol) """ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) @@ -135,8 +138,12 @@ class KafkaConnection(local): def recv(self, request_id): """ Get a response packet from Kafka - param: request_id -- can be any int (only used for debug logging...) - returns encoded kafka packet response from server as type str + + Arguments: + request_id: can be any int (only used for debug logging...) + + Returns: + str: Encoded kafka packet response from server """ log.debug("Reading response %d from Kafka" % request_id) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 2464aaf..9cdcf89 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -32,9 +32,11 @@ class Consumer(object): Base class to be used by other consumers. Not to be used directly This base class provides logic for + * initialization and fetching metadata of partitions * Auto-commit logic * APIs for fetching pending message count + """ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, @@ -93,8 +95,9 @@ class Consumer(object): """ Commit offsets for this consumer - partitions: list of partitions to commit, default is to commit - all of them + Keyword Arguments: + partitions (list): list of partitions to commit, default is to commit + all of them """ # short circuit if nothing happened. This check is kept outside @@ -148,7 +151,8 @@ class Consumer(object): """ Gets the pending message count - partitions: list of partitions to check for, default is to check all + Keyword Arguments: + partitions (list): list of partitions to check for, default is to check all """ if not partitions: partitions = self.offsets.keys() diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index f16b526..ae0f0b9 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -54,72 +54,78 @@ class KafkaConsumer(object): """ A simpler kafka consumer - ``` - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1') - for m in kafka: - print m - - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" - ``` - - ``` - # more advanced consumer -- multiple topics w/ auto commit offset management - kafka = KafkaConsumer('topic1', 'topic2', - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) - - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): + .. code:: python + + # A very basic 'tail' consumer, with no stored offset management + kafka = KafkaConsumer('topic1') + for m in kafka: + print m + + # Alternate interface: next() + print kafka.next() + + # Alternate interface: batch iteration + while True: + for m in kafka.fetch_messages(): + print m + print "Done with batch - let's do another!" + + + .. code:: python + + # more advanced consumer -- multiple topics w/ auto commit offset management + kafka = KafkaConsumer('topic1', 'topic2', + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in kafka: + process_message(m) + kafka.task_done(m) + + # Alternate interface: next() + m = kafka.next() process_message(m) kafka.task_done(m) - ``` + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + messages (m) are namedtuples with attributes: - m.topic: topic name (str) - m.partition: partition number (int) - m.offset: message offset on topic-partition log (int) - m.key: key (bytes - can be None) - m.value: message (output of deserializer_class - default is raw bytes) + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) Configuration settings can be passed to constructor, otherwise defaults will be used: - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - metadata_broker_list=None, - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 + + .. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + metadata_broker_list=None, + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi @@ -133,6 +139,9 @@ class KafkaConsumer(object): """ Configuration settings can be passed to constructor, otherwise defaults will be used: + + .. code:: python + client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, @@ -189,28 +198,35 @@ class KafkaConsumer(object): Optionally specify offsets to start from Accepts types: - str (utf-8): topic name (will consume all available partitions) - tuple: (topic, partition) - dict: { topic: partition } - { topic: [partition list] } - { topic: (partition tuple,) } + + * str (utf-8): topic name (will consume all available partitions) + * tuple: (topic, partition) + * dict: + - { topic: partition } + - { topic: [partition list] } + - { topic: (partition tuple,) } Optionally, offsets can be specified directly: - tuple: (topic, partition, offset) - dict: { (topic, partition): offset, ... } - Ex: - kafka = KafkaConsumer() + * tuple: (topic, partition, offset) + * dict: { (topic, partition): offset, ... } + + Example: + + .. code:: python + + kafka = KafkaConsumer() - # Consume topic1-all; topic2-partition2; topic3-partition0 - kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 - # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) - # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) """ self._topics = [] self._client.load_metadata_for_topics() @@ -309,10 +325,12 @@ class KafkaConsumer(object): Otherwise blocks indefinitely Note that this is also the method called internally during iteration: - ``` - for m in consumer: - pass - ``` + + .. code:: python + + for m in consumer: + pass + """ self._set_consumer_timeout_start() while True: @@ -336,11 +354,12 @@ class KafkaConsumer(object): OffsetOutOfRange, per the configured `auto_offset_reset` policy Key configuration parameters: - `fetch_message_max_bytes` - `fetch_max_wait_ms` - `fetch_min_bytes` - `deserializer_class` - `auto_offset_reset` + + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` """ max_bytes = self._config['fetch_message_max_bytes'] @@ -418,20 +437,18 @@ class KafkaConsumer(object): """ Request available fetch offsets for a single topic/partition - @param topic (str) - @param partition (int) - @param request_time_ms (int) -- Used to ask for all messages before a - certain time (ms). There are two special - values. Specify -1 to receive the latest - offset (i.e. the offset of the next coming - message) and -2 to receive the earliest - available offset. Note that because offsets - are pulled in descending order, asking for - the earliest offset will always return you - a single element. - @param max_num_offsets (int) - - @return offsets (list) + Arguments: + topic (str) + partition (int) + request_time_ms (int): Used to ask for all messages before a + certain time (ms). There are two special values. Specify -1 to receive the latest + offset (i.e. the offset of the next coming message) and -2 to receive the earliest + available offset. Note that because offsets are pulled in descending order, asking for + the earliest offset will always return you a single element. + max_num_offsets (int) + + Returns: + offsets (list) """ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] @@ -448,9 +465,12 @@ class KafkaConsumer(object): def offsets(self, group=None): """ - Returns a copy of internal offsets struct - optional param: group [fetch|commit|task_done|highwater] - if no group specified, returns all groups + Keyword Arguments: + group: Either "fetch", "commit", "task_done", or "highwater". + If no group specified, returns all groups. + + Returns: + A copy of internal offsets struct """ if not group: return { @@ -498,8 +518,8 @@ class KafkaConsumer(object): Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group. - Note -- this functionality requires server version >=0.8.1.1 - see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + **Note**: this functionality requires server version >=0.8.1.1 + See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 912e64b..4dc04dc 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer): A consumer implementation that consumes partitions for a topic in parallel using multiple processes - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 0593b5b..2ec99f2 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -70,27 +70,36 @@ class SimpleConsumer(Consumer): A simple consumer implementation that consumes all/specified partitions for a topic - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. - auto_offset_reset: default largest. Reset partition offsets upon - OffsetOutOfRangeError. Valid values are largest and smallest. - If None do not reset the offsets and raise OffsetOutOfRangeError. + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + + auto_commit_every_n: default 100. How many messages to consume + before a commit + + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. + + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -166,11 +175,13 @@ class SimpleConsumer(Consumer): """ Alter the current offset in the consumer, similar to fseek - offset: how much to modify the offset - whence: where to modify it from - 0 is relative to the earliest available offset (head) - 1 is relative to the current offset - 2 is relative to the latest known offset (tail) + Arguments: + offset: how much to modify the offset + whence: where to modify it from + + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) """ if whence == 1: # relative to current position @@ -213,11 +224,12 @@ class SimpleConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout is not None: diff --git a/kafka/context.py b/kafka/context.py index 98ed7b3..ade4db8 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -18,6 +18,8 @@ class OffsetCommitContext(object): Example: + .. code:: python + consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() consumer.fetch_last_known_offsets() @@ -57,7 +59,10 @@ class OffsetCommitContext(object): In order to know the current partition, it is helpful to initialize the consumer to provide partition info via: + .. code:: python + consumer.provide_partition_info() + """ max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py index c62b7ed..0b1bb59 100644 --- a/kafka/partitioner/base.py +++ b/kafka/partitioner/base.py @@ -7,7 +7,8 @@ class Partitioner(object): """ Initialize the partitioner - partitions - A list of available partitions (during startup) + Arguments: + partitions: A list of available partitions (during startup) """ self.partitions = partitions @@ -16,8 +17,9 @@ class Partitioner(object): Takes a string key and num_partitions as argument and returns a partition to be used for the message - partitions - The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing + Arguments: + partitions: The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing """ raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92..5b41bc9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -85,20 +85,20 @@ class Producer(object): """ Base class to be used by producers - Params: - client - The Kafka client instance to use - async - If set to true, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + async: If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these WARNING!!! current implementation of async producer does not guarantee message delivery. Use at your own risk! Or help us improve with a PR! - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ ACK_NOT_REQUIRED = 0 # No ack is required diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 68c70d9..fe5b056 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -15,17 +15,19 @@ class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key - Args: - client - The kafka client instance - partitioner - A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async - If True, the messages are sent asynchronously via another + Arguments: + client: The kafka client instance + + Keyword Arguments: + partitioner: A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index a10fa8c..afeae06 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -19,21 +19,23 @@ class SimpleProducer(Producer): """ A simple, round-robin producer. Each message goes to exactly one partition - Params: - client - The Kafka client instance to use - async - If True, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + + Keyword Arguments: + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout - random_start - If true, randomize the initial partition which the - the first message block will be published to, otherwise - if false, the first message block will always publish - to partition 0 before cycling through each partition + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout + random_start: If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition """ def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, @@ -42,7 +44,7 @@ class SimpleProducer(Producer): batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=False): + random_start=True): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, diff --git a/kafka/protocol.py b/kafka/protocol.py index a85c7eb..2a39de6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -185,18 +185,18 @@ class KafkaProtocol(object): """ Encode some ProduceRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of ProduceRequest - acks: How "acky" you want the request to be - 0: immediate response - 1: written to disk by the leader - 2+: waits for this many number of replicas to sync - -1: waits for all replicas to be in sync - timeout: Maximum time the server will wait for acks from replicas. - This is _not_ a socket timeout + Arguments: + client_id: string + correlation_id: int + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. + This is _not_ a socket timeout + """ payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) @@ -225,9 +225,9 @@ class KafkaProtocol(object): """ Decode bytes to a ProduceResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode + """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -248,14 +248,13 @@ class KafkaProtocol(object): """ Encodes some FetchRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of FetchRequest - max_wait_time: int, how long to block waiting on min_bytes of data - min_bytes: int, the minimum number of bytes to accumulate before - returning the response + Arguments: + client_id: string + correlation_id: int + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before + returning the response """ payloads = [] if payloads is None else payloads @@ -284,9 +283,8 @@ class KafkaProtocol(object): """ Decode bytes to a FetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -333,9 +331,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -360,11 +357,10 @@ class KafkaProtocol(object): """ Encode a MetadataRequest - Params - ====== - client_id: string - correlation_id: int - topics: list of strings + Arguments: + client_id: string + correlation_id: int + topics: list of strings """ if payloads is None: topics = [] if topics is None else topics @@ -388,9 +384,8 @@ class KafkaProtocol(object): """ Decode bytes to a MetadataResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) @@ -439,12 +434,11 @@ class KafkaProtocol(object): """ Encode some OffsetCommitRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -470,9 +464,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetCommitResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -491,12 +484,11 @@ class KafkaProtocol(object): """ Encode some OffsetFetchRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -522,9 +514,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetFetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) @@ -547,10 +538,10 @@ def create_message(payload, key=None): """ Construct a Message - Params - ====== - payload: bytes, the payload to send to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ return Message(0, 0, key, payload) @@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) @@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) diff --git a/kafka/queue.py b/kafka/queue.py index ada495f..26cafad 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -129,13 +129,12 @@ class KafkaQueue(object): Messages are buffered in the producer thread until producer_flush_timeout or producer_flush_buffer is reached. - Params - ====== - client: KafkaClient object - topic: str, the topic name - partitions: list of ints, the partions to consume from - producer_config: dict, see below - consumer_config: dict, see below + Arguments: + client: KafkaClient object + topic: str, the topic name + partitions: list of ints, the partions to consume from + producer_config: dict, see below + consumer_config: dict, see below Consumer Config =============== @@ -184,14 +183,12 @@ class KafkaQueue(object): """ Consume a message from Kafka - Params - ====== - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None + Arguments: + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None - Returns - ======= - msg: str, the payload from Kafka + Returns: + msg: str, the payload from Kafka """ return self.in_queue.get(block, timeout).payload @@ -199,11 +196,10 @@ class KafkaQueue(object): """ Send a message to Kafka - Params - ====== - msg: std, the message to send - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None + Arguments: + msg: std, the message to send + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None """ self.out_queue.put(msg, block, timeout) diff --git a/kafka/util.py b/kafka/util.py index 72ac521..622b1a7 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -103,10 +103,12 @@ class ReentrantTimer(object): A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function + Arguments: + + t: timer interval in milliseconds + fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ def __init__(self, t, fn, *args, **kwargs): diff --git a/servers/0.8.2.0/resources/kafka.properties b/servers/0.8.2.0/resources/kafka.properties new file mode 100644 index 0000000..a638f39 --- /dev/null +++ b/servers/0.8.2.0/resources/kafka.properties @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port={port} + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name={host} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 diff --git a/servers/0.8.2.0/resources/log4j.properties b/servers/0.8.2.0/resources/log4j.properties new file mode 100644 index 0000000..f863b3b --- /dev/null +++ b/servers/0.8.2.0/resources/log4j.properties @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.logger.kafka=DEBUG, stdout +log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout +log4j.logger.org.apache.zookeeper=INFO, stdout diff --git a/servers/0.8.2.0/resources/zookeeper.properties b/servers/0.8.2.0/resources/zookeeper.properties new file mode 100644 index 0000000..e3fd097 --- /dev/null +++ b/servers/0.8.2.0/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/test/fixtures.py b/test/fixtures.py index b286619..3c496fd 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -123,7 +123,7 @@ class ZookeeperFixture(Fixture): # Party! self.out("Starting...") self.child.start() - self.child.wait_for(r"Snapshotting") + self.child.wait_for(r"binding to port") self.out("Done!") def close(self): @@ -212,8 +212,8 @@ class KafkaFixture(Fixture): if proc.wait() != 0: self.out("Failed to create Zookeeper chroot node") - self.out(proc.stdout) - self.out(proc.stderr) + self.out(proc.stdout.read()) + self.out(proc.stderr.read()) raise RuntimeError("Failed to create Zookeeper chroot node") self.out("Done!") diff --git a/test/test_client.py b/test/test_client.py index ba6e3b1..c522d9a 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -10,9 +10,8 @@ from kafka.common import ( ProduceRequest, MetadataResponse, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - LeaderNotAvailableError, NoError, - UnknownTopicOrPartitionError, KafkaTimeoutError, - ConnectionError + LeaderNotAvailableError, UnknownTopicOrPartitionError, + KafkaTimeoutError, ConnectionError ) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol, create_message diff --git a/test/test_client_integration.py b/test/test_client_integration.py index cc60778..c0331ea 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -54,7 +54,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): # Offset Tests # #################### - @kafka_versions("0.8.1", "0.8.1.1") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") def test_commit_fetch_offsets(self): req = OffsetCommitRequest(self.topic, 0, 42, b"metadata") (resp,) = self.client.send_offset_commit_request(b"group", [req]) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index ea32318..4723220 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -257,7 +257,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): big_consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -357,7 +357,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - @kafka_versions("0.8.1", "0.8.1.1") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") def test_kafka_consumer__offset_commit_resume(self): GROUP_ID = random_string(10) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 4331d23..19d28bd 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -142,7 +142,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def test_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) - producer = SimpleProducer(self.client) + producer = SimpleProducer(self.client, random_start=False) # Goes to first partition, randomly. resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) @@ -165,7 +165,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_produce__new_topic_fails_with_reasonable_error(self): new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8') - producer = SimpleProducer(self.client) + producer = SimpleProducer(self.client, random_start=False) # At first it doesn't exist with self.assertRaises((UnknownTopicOrPartitionError, @@ -174,7 +174,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_producer_random_order(self): - producer = SimpleProducer(self.client, random_start = True) + producer = SimpleProducer(self.client, random_start=True) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) resp2 = producer.send_messages(self.topic, self.msg("three")) resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) @@ -184,7 +184,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_producer_ordered_start(self): - producer = SimpleProducer(self.client, random_start = False) + producer = SimpleProducer(self.client, random_start=False) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) resp2 = producer.send_messages(self.topic, self.msg("three")) resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) @@ -249,7 +249,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def test_acks_none(self): start_offset0 = self.current_offset(self.topic, 0) - producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED, + random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) @@ -260,7 +261,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def test_acks_local_write(self): start_offset0 = self.current_offset(self.topic, 0) - producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) + producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assert_produce_response(resp, start_offset0) @@ -274,7 +276,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = SimpleProducer( self.client, - req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT) + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, + random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assert_produce_response(resp, start_offset0) @@ -287,10 +290,12 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) - producer = SimpleProducer(self.client, - batch_send=True, - batch_send_every_n=5, - batch_send_every_t=20) + producer = SimpleProducer( + self.client, + batch_send=True, + batch_send_every_n=5, + batch_send_every_t=20, + random_start=False) # Send 5 messages and do a fetch resp = producer.send_messages(self.topic, @@ -337,9 +342,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, - batch_send=True, - batch_send_every_n=100, - batch_send_every_t=5) + batch_send=True, + batch_send_every_n=100, + batch_send_every_t=5, + random_start=False) # Send 5 messages and do a fetch resp = producer.send_messages(self.topic, @@ -387,7 +393,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) - producer = SimpleProducer(self.client, async=True) + producer = SimpleProducer(self.client, async=True, random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) |