diff options
author | zhangxuanyuan <zhangxuanyuan@unionpay.com> | 2017-09-14 15:52:05 +0800 |
---|---|---|
committer | zhangxuanyuan <zhangxuanyuan@unionpay.com> | 2017-09-27 10:52:21 +0800 |
commit | 20023730174bda005b3fe6978ad2e98efe060b75 (patch) | |
tree | 0219218678974905416d70e865b075eaeb0603b5 | |
parent | 1cb713f3e238fa1d534e8ca9f5baae4c54af0609 (diff) | |
download | ceilometer-20023730174bda005b3fe6978ad2e98efe060b75.tar.gz |
Remove class KafkaBrokerPublisher
Remove class KafkaBrokerPublisher and use NotifierPublisher instead
Change-Id: I12fb8666c9af485c9bf9aace8eee08f2e4683e09
-rw-r--r-- | ceilometer/publisher/kafka_broker.py | 101 | ||||
-rw-r--r-- | ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py | 213 | ||||
-rw-r--r-- | releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml | 4 | ||||
-rw-r--r-- | setup.cfg | 2 |
4 files changed, 4 insertions, 316 deletions
diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py deleted file mode 100644 index d3b35942..00000000 --- a/ceilometer/publisher/kafka_broker.py +++ /dev/null @@ -1,101 +0,0 @@ -# -# Copyright 2015 Cisco Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from debtcollector import removals -import kafka -from oslo_log import log -from oslo_serialization import jsonutils -from oslo_utils import netutils -from six.moves.urllib import parse as urlparse - -from ceilometer.publisher import messaging - -LOG = log.getLogger(__name__) - - -@removals.removed_class("KafkaBrokerPublisher", - message="use NotifierPublisher instead", - removal_version='10.0') -class KafkaBrokerPublisher(messaging.MessagingPublisher): - """Publish metering data to kafka broker. - - The ip address and port number of kafka broker should be configured in - ceilometer pipeline configuration file. If an ip address is not specified, - this kafka publisher will not publish any meters. - - To enable this publisher, add the following section to the - /etc/ceilometer/pipeline.yaml file or simply add it to an existing - pipeline:: - - meter: - - name: meter_kafka - meters: - - "*" - sinks: - - kafka_sink - sinks: - - name: kafka_sink - transformers: - publishers: - - kafka://[kafka_broker_ip]:[kafka_broker_port]?topic=[topic] - - Kafka topic name and broker's port are required for this publisher to work - properly. If topic parameter is missing, this kafka publisher publish - metering data under a topic name, 'ceilometer'. If the port number is not - specified, this Kafka Publisher will use 9092 as the broker's port. - This publisher has transmit options such as queue, drop, and retry. These - options are specified using policy field of URL parameter. When queue - option could be selected, local queue length can be determined using - max_queue_length field as well. When the transfer fails with retry - option, try to resend the data as many times as specified in max_retry - field. If max_retry is not specified, default the number of retry is 100. - """ - - def __init__(self, conf, parsed_url): - super(KafkaBrokerPublisher, self).__init__(conf, parsed_url) - options = urlparse.parse_qs(parsed_url.query) - - self._producer = None - self._host, self._port = netutils.parse_host_port( - parsed_url.netloc, default_port=9092) - self._topic = options.get('topic', ['ceilometer'])[-1] - self.max_retry = int(options.get('max_retry', [100])[-1]) - - def _ensure_connection(self): - if self._producer: - return - - try: - self._producer = kafka.KafkaProducer( - bootstrap_servers=["%s:%s" % (self._host, self._port)]) - except kafka.errors.KafkaError as e: - LOG.exception("Failed to connect to Kafka service: %s", e) - raise messaging.DeliveryFailure('Kafka Client is not available, ' - 'please restart Kafka client') - except Exception as e: - LOG.exception("Failed to connect to Kafka service: %s", e) - raise messaging.DeliveryFailure('Kafka Client is not available, ' - 'please restart Kafka client') - - def _send(self, event_type, data): - self._ensure_connection() - # TODO(sileht): don't split the payload into multiple network - # message ... but how to do that without breaking consuming - # application... - try: - for d in data: - self._producer.send(self._topic, jsonutils.dumps(d)) - except Exception as e: - messaging.raise_delivery_failure(e) diff --git a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py deleted file mode 100644 index f2d749d8..00000000 --- a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py +++ /dev/null @@ -1,213 +0,0 @@ -# -# Copyright 2015 Cisco Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Tests for ceilometer/publisher/kafka_broker.py -""" -import datetime -import uuid - -import mock -from oslo_utils import netutils - -from ceilometer.event.storage import models as event -from ceilometer.publisher import kafka_broker as kafka -from ceilometer.publisher import messaging as msg_publisher -from ceilometer import sample -from ceilometer import service -from ceilometer.tests import base as tests_base - - -@mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock()) -class TestKafkaPublisher(tests_base.BaseTestCase): - test_event_data = [ - event.Event(message_id=uuid.uuid4(), - event_type='event_%d' % i, - generated=datetime.datetime.utcnow(), - traits=[], raw={}) - for i in range(0, 5) - ] - - test_data = [ - sample.Sample( - name='test', - type=sample.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'TestPublish'}, - ), - sample.Sample( - name='test', - type=sample.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'TestPublish'}, - ), - sample.Sample( - name='test2', - type=sample.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'TestPublish'}, - ), - sample.Sample( - name='test2', - type=sample.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'TestPublish'}, - ), - sample.Sample( - name='test3', - type=sample.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'TestPublish'}, - ), - ] - - def setUp(self): - super(TestKafkaPublisher, self).setUp() - self.CONF = service.prepare_service([], []) - - def test_publish(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - publisher.publish_samples(self.test_data) - self.assertEqual(5, len(fake_producer.send.mock_calls)) - self.assertEqual(0, len(publisher.local_queue)) - - def test_publish_without_options(self): - publisher = kafka.KafkaBrokerPublisher( - self.CONF, netutils.urlsplit('kafka://127.0.0.1:9092')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - publisher.publish_samples(self.test_data) - self.assertEqual(5, len(fake_producer.send.mock_calls)) - self.assertEqual(0, len(publisher.local_queue)) - - def test_publish_to_host_without_policy(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer')) - self.assertEqual('default', publisher.policy) - - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer&policy=test')) - self.assertEqual('default', publisher.policy) - - def test_publish_to_host_with_default_policy(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer&policy=default')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send.side_effect = TypeError - self.assertRaises(msg_publisher.DeliveryFailure, - publisher.publish_samples, - self.test_data) - self.assertEqual(100, len(fake_producer.send.mock_calls)) - self.assertEqual(0, len(publisher.local_queue)) - - def test_publish_to_host_with_drop_policy(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send.side_effect = Exception("test") - publisher.publish_samples(self.test_data) - self.assertEqual(1, len(fake_producer.send.mock_calls)) - self.assertEqual(0, len(publisher.local_queue)) - - def test_publish_to_host_with_queue_policy(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send.side_effect = Exception("test") - publisher.publish_samples(self.test_data) - self.assertEqual(1, len(fake_producer.send.mock_calls)) - self.assertEqual(1, len(publisher.local_queue)) - - def test_publish_to_down_host_with_default_queue_size(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send.side_effect = Exception("test") - - for i in range(0, 2000): - for s in self.test_data: - s.name = 'test-%d' % i - publisher.publish_samples(self.test_data) - - self.assertEqual(1024, len(publisher.local_queue)) - self.assertEqual('test-976', - publisher.local_queue[0][1][0]['counter_name']) - self.assertEqual('test-1999', - publisher.local_queue[1023][1][0]['counter_name']) - - def test_publish_to_host_from_down_to_up_with_queue(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send.side_effect = Exception("test") - for i in range(0, 16): - for s in self.test_data: - s.name = 'test-%d' % i - publisher.publish_samples(self.test_data) - - self.assertEqual(16, len(publisher.local_queue)) - - fake_producer.send.side_effect = None - for s in self.test_data: - s.name = 'test-%d' % 16 - publisher.publish_samples(self.test_data) - self.assertEqual(0, len(publisher.local_queue)) - - def test_publish_event_with_default_policy(self): - publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit( - 'kafka://127.0.0.1:9092?topic=ceilometer')) - - with mock.patch.object(publisher, '_producer') as fake_producer: - publisher.publish_events(self.test_event_data) - self.assertEqual(5, len(fake_producer.send.mock_calls)) - - with mock.patch.object(publisher, '_producer') as fake_producer: - fake_producer.send.side_effect = Exception("test") - self.assertRaises(msg_publisher.DeliveryFailure, - publisher.publish_events, - self.test_event_data) - self.assertEqual(100, len(fake_producer.send.mock_calls)) - self.assertEqual(0, len(publisher.local_queue)) diff --git a/releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml b/releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml new file mode 100644 index 00000000..13fcb95b --- /dev/null +++ b/releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + The deprecated kafka publisher has been removed, use NotifierPublisher instead. @@ -245,7 +245,6 @@ ceilometer.sample.publisher = udp = ceilometer.publisher.udp:UDPPublisher file = ceilometer.publisher.file:FilePublisher direct = ceilometer.publisher.direct:DirectPublisher - kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher http = ceilometer.publisher.http:HttpPublisher https = ceilometer.publisher.http:HttpPublisher gnocchi = ceilometer.publisher.direct:DirectPublisher @@ -258,7 +257,6 @@ ceilometer.event.publisher = test = ceilometer.publisher.test:TestPublisher direct = ceilometer.publisher.direct:DirectPublisher notifier = ceilometer.publisher.messaging:EventNotifierPublisher - kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher http = ceilometer.publisher.http:HttpPublisher https = ceilometer.publisher.http:HttpPublisher gnocchi = ceilometer.publisher.direct:DirectPublisher |