summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2016-12-07 19:30:40 +0100
committerMehdi Abaakouk <sileht@redhat.com>2017-01-02 11:46:48 +0100
commit799cd6fa8f5435e3b4473e4e224df099380919c1 (patch)
tree2f9d4d0722043f492f68ad5d38ecfdf8448e40d2
parenta76a51a78c9a5d07dfc2f57878738febe5fff4d9 (diff)
downloadoslo-messaging-799cd6fa8f5435e3b4473e4e224df099380919c1.tar.gz
kafka: disable batch for functional tests
Change-Id: I09a3049ca5f4647d0f6b002b3732a4c0edd43986
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py8
-rw-r--r--oslo_messaging/_drivers/kafka_options.py7
-rw-r--r--oslo_messaging/tests/functional/utils.py21
3 files changed, 21 insertions, 15 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 6c90d30..4c8dc29 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -26,7 +26,6 @@ import threading
import kafka
from kafka.client_async import selectors
import kafka.errors
-from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import eventletutils
import tenacity
@@ -319,15 +318,10 @@ class KafkaDriver(base.BaseDriver):
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
-
- opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
- title='Kafka driver options')
- conf.register_group(opt_group)
- conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group)
-
super(KafkaDriver, self).__init__(
conf, url, default_exchange, allowed_remote_exmods)
+ kafka_options.register_opts(conf)
# the pool configuration properties
max_size = self.conf.oslo_messaging_kafka.pool_size
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
diff --git a/oslo_messaging/_drivers/kafka_options.py b/oslo_messaging/_drivers/kafka_options.py
index c733a0a..7989288 100644
--- a/oslo_messaging/_drivers/kafka_options.py
+++ b/oslo_messaging/_drivers/kafka_options.py
@@ -50,3 +50,10 @@ KAFKA_OPTS = [
cfg.IntOpt('producer_batch_size', default=16384,
help='Size of batch for the producer async send')
]
+
+
+def register_opts(conf):
+ opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
+ title='Kafka driver options')
+ conf.register_group(opt_group)
+ conf.register_opts(KAFKA_OPTS, group=opt_group)
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index ebdc725..b47ac5f 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -20,6 +20,7 @@ from oslo_config import cfg
from six import moves
import oslo_messaging
+from oslo_messaging._drivers import kafka_options
from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging.notify import notifier
from oslo_messaging.tests import utils as test_utils
@@ -305,23 +306,27 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
rpc_zmq_ipc_dir=zmq_ipc_dir)
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
if zmq_redis_port:
- self.config(port=zmq_redis_port, group="matchmaker_redis")
- self.config(check_timeout=10000, group="matchmaker_redis")
- self.config(wait_timeout=1000, group="matchmaker_redis")
+ self.config(port=zmq_redis_port,
+ check_timeout=10000,
+ wait_timeout=1000,
+ group="matchmaker_redis")
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
- self.config(use_pub_sub=zmq_use_pub_sub,
- group='oslo_messaging_zmq')
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
- self.config(use_router_proxy=zmq_use_router_proxy,
- group='oslo_messaging_zmq')
zmq_use_acks = os.environ.get('ZMQ_USE_ACKS')
- self.config(rpc_use_acks=zmq_use_acks,
+ self.config(use_pub_sub=zmq_use_pub_sub,
+ use_router_proxy=zmq_use_router_proxy,
+ rpc_use_acks=zmq_use_acks,
group='oslo_messaging_zmq')
zmq_use_dynamic_connections = \
os.environ.get('ZMQ_USE_DYNAMIC_CONNECTIONS')
self.config(use_dynamic_connections=zmq_use_dynamic_connections,
group='oslo_messaging_zmq')
+ kafka_options.register_opts(conf)
+
+ self.config(producer_batch_size=0,
+ group='oslo_messaging_kafka')
+
class NotificationFixture(fixtures.Fixture):
def __init__(self, conf, url, topics, batch=None):