summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhangxuanyuan <zhangxuanyuan@unionpay.com>2017-09-14 15:52:05 +0800
committerzhangxuanyuan <zhangxuanyuan@unionpay.com>2017-09-27 10:52:21 +0800
commit20023730174bda005b3fe6978ad2e98efe060b75 (patch)
tree0219218678974905416d70e865b075eaeb0603b5
parent1cb713f3e238fa1d534e8ca9f5baae4c54af0609 (diff)
downloadceilometer-20023730174bda005b3fe6978ad2e98efe060b75.tar.gz
Remove class KafkaBrokerPublisher
Remove class KafkaBrokerPublisher and use NotifierPublisher instead Change-Id: I12fb8666c9af485c9bf9aace8eee08f2e4683e09
-rw-r--r--ceilometer/publisher/kafka_broker.py101
-rw-r--r--ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py213
-rw-r--r--releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml4
-rw-r--r--setup.cfg2
4 files changed, 4 insertions, 316 deletions
diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py
deleted file mode 100644
index d3b35942..00000000
--- a/ceilometer/publisher/kafka_broker.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Copyright 2015 Cisco Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from debtcollector import removals
-import kafka
-from oslo_log import log
-from oslo_serialization import jsonutils
-from oslo_utils import netutils
-from six.moves.urllib import parse as urlparse
-
-from ceilometer.publisher import messaging
-
-LOG = log.getLogger(__name__)
-
-
-@removals.removed_class("KafkaBrokerPublisher",
- message="use NotifierPublisher instead",
- removal_version='10.0')
-class KafkaBrokerPublisher(messaging.MessagingPublisher):
- """Publish metering data to kafka broker.
-
- The ip address and port number of kafka broker should be configured in
- ceilometer pipeline configuration file. If an ip address is not specified,
- this kafka publisher will not publish any meters.
-
- To enable this publisher, add the following section to the
- /etc/ceilometer/pipeline.yaml file or simply add it to an existing
- pipeline::
-
- meter:
- - name: meter_kafka
- meters:
- - "*"
- sinks:
- - kafka_sink
- sinks:
- - name: kafka_sink
- transformers:
- publishers:
- - kafka://[kafka_broker_ip]:[kafka_broker_port]?topic=[topic]
-
- Kafka topic name and broker's port are required for this publisher to work
- properly. If topic parameter is missing, this kafka publisher publish
- metering data under a topic name, 'ceilometer'. If the port number is not
- specified, this Kafka Publisher will use 9092 as the broker's port.
- This publisher has transmit options such as queue, drop, and retry. These
- options are specified using policy field of URL parameter. When queue
- option could be selected, local queue length can be determined using
- max_queue_length field as well. When the transfer fails with retry
- option, try to resend the data as many times as specified in max_retry
- field. If max_retry is not specified, default the number of retry is 100.
- """
-
- def __init__(self, conf, parsed_url):
- super(KafkaBrokerPublisher, self).__init__(conf, parsed_url)
- options = urlparse.parse_qs(parsed_url.query)
-
- self._producer = None
- self._host, self._port = netutils.parse_host_port(
- parsed_url.netloc, default_port=9092)
- self._topic = options.get('topic', ['ceilometer'])[-1]
- self.max_retry = int(options.get('max_retry', [100])[-1])
-
- def _ensure_connection(self):
- if self._producer:
- return
-
- try:
- self._producer = kafka.KafkaProducer(
- bootstrap_servers=["%s:%s" % (self._host, self._port)])
- except kafka.errors.KafkaError as e:
- LOG.exception("Failed to connect to Kafka service: %s", e)
- raise messaging.DeliveryFailure('Kafka Client is not available, '
- 'please restart Kafka client')
- except Exception as e:
- LOG.exception("Failed to connect to Kafka service: %s", e)
- raise messaging.DeliveryFailure('Kafka Client is not available, '
- 'please restart Kafka client')
-
- def _send(self, event_type, data):
- self._ensure_connection()
- # TODO(sileht): don't split the payload into multiple network
- # message ... but how to do that without breaking consuming
- # application...
- try:
- for d in data:
- self._producer.send(self._topic, jsonutils.dumps(d))
- except Exception as e:
- messaging.raise_delivery_failure(e)
diff --git a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py
deleted file mode 100644
index f2d749d8..00000000
--- a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py
+++ /dev/null
@@ -1,213 +0,0 @@
-#
-# Copyright 2015 Cisco Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""Tests for ceilometer/publisher/kafka_broker.py
-"""
-import datetime
-import uuid
-
-import mock
-from oslo_utils import netutils
-
-from ceilometer.event.storage import models as event
-from ceilometer.publisher import kafka_broker as kafka
-from ceilometer.publisher import messaging as msg_publisher
-from ceilometer import sample
-from ceilometer import service
-from ceilometer.tests import base as tests_base
-
-
-@mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock())
-class TestKafkaPublisher(tests_base.BaseTestCase):
- test_event_data = [
- event.Event(message_id=uuid.uuid4(),
- event_type='event_%d' % i,
- generated=datetime.datetime.utcnow(),
- traits=[], raw={})
- for i in range(0, 5)
- ]
-
- test_data = [
- sample.Sample(
- name='test',
- type=sample.TYPE_CUMULATIVE,
- unit='',
- volume=1,
- user_id='test',
- project_id='test',
- resource_id='test_run_tasks',
- timestamp=datetime.datetime.utcnow().isoformat(),
- resource_metadata={'name': 'TestPublish'},
- ),
- sample.Sample(
- name='test',
- type=sample.TYPE_CUMULATIVE,
- unit='',
- volume=1,
- user_id='test',
- project_id='test',
- resource_id='test_run_tasks',
- timestamp=datetime.datetime.utcnow().isoformat(),
- resource_metadata={'name': 'TestPublish'},
- ),
- sample.Sample(
- name='test2',
- type=sample.TYPE_CUMULATIVE,
- unit='',
- volume=1,
- user_id='test',
- project_id='test',
- resource_id='test_run_tasks',
- timestamp=datetime.datetime.utcnow().isoformat(),
- resource_metadata={'name': 'TestPublish'},
- ),
- sample.Sample(
- name='test2',
- type=sample.TYPE_CUMULATIVE,
- unit='',
- volume=1,
- user_id='test',
- project_id='test',
- resource_id='test_run_tasks',
- timestamp=datetime.datetime.utcnow().isoformat(),
- resource_metadata={'name': 'TestPublish'},
- ),
- sample.Sample(
- name='test3',
- type=sample.TYPE_CUMULATIVE,
- unit='',
- volume=1,
- user_id='test',
- project_id='test',
- resource_id='test_run_tasks',
- timestamp=datetime.datetime.utcnow().isoformat(),
- resource_metadata={'name': 'TestPublish'},
- ),
- ]
-
- def setUp(self):
- super(TestKafkaPublisher, self).setUp()
- self.CONF = service.prepare_service([], [])
-
- def test_publish(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- publisher.publish_samples(self.test_data)
- self.assertEqual(5, len(fake_producer.send.mock_calls))
- self.assertEqual(0, len(publisher.local_queue))
-
- def test_publish_without_options(self):
- publisher = kafka.KafkaBrokerPublisher(
- self.CONF, netutils.urlsplit('kafka://127.0.0.1:9092'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- publisher.publish_samples(self.test_data)
- self.assertEqual(5, len(fake_producer.send.mock_calls))
- self.assertEqual(0, len(publisher.local_queue))
-
- def test_publish_to_host_without_policy(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer'))
- self.assertEqual('default', publisher.policy)
-
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer&policy=test'))
- self.assertEqual('default', publisher.policy)
-
- def test_publish_to_host_with_default_policy(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer&policy=default'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- fake_producer.send.side_effect = TypeError
- self.assertRaises(msg_publisher.DeliveryFailure,
- publisher.publish_samples,
- self.test_data)
- self.assertEqual(100, len(fake_producer.send.mock_calls))
- self.assertEqual(0, len(publisher.local_queue))
-
- def test_publish_to_host_with_drop_policy(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- fake_producer.send.side_effect = Exception("test")
- publisher.publish_samples(self.test_data)
- self.assertEqual(1, len(fake_producer.send.mock_calls))
- self.assertEqual(0, len(publisher.local_queue))
-
- def test_publish_to_host_with_queue_policy(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- fake_producer.send.side_effect = Exception("test")
- publisher.publish_samples(self.test_data)
- self.assertEqual(1, len(fake_producer.send.mock_calls))
- self.assertEqual(1, len(publisher.local_queue))
-
- def test_publish_to_down_host_with_default_queue_size(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- fake_producer.send.side_effect = Exception("test")
-
- for i in range(0, 2000):
- for s in self.test_data:
- s.name = 'test-%d' % i
- publisher.publish_samples(self.test_data)
-
- self.assertEqual(1024, len(publisher.local_queue))
- self.assertEqual('test-976',
- publisher.local_queue[0][1][0]['counter_name'])
- self.assertEqual('test-1999',
- publisher.local_queue[1023][1][0]['counter_name'])
-
- def test_publish_to_host_from_down_to_up_with_queue(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- fake_producer.send.side_effect = Exception("test")
- for i in range(0, 16):
- for s in self.test_data:
- s.name = 'test-%d' % i
- publisher.publish_samples(self.test_data)
-
- self.assertEqual(16, len(publisher.local_queue))
-
- fake_producer.send.side_effect = None
- for s in self.test_data:
- s.name = 'test-%d' % 16
- publisher.publish_samples(self.test_data)
- self.assertEqual(0, len(publisher.local_queue))
-
- def test_publish_event_with_default_policy(self):
- publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
- 'kafka://127.0.0.1:9092?topic=ceilometer'))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- publisher.publish_events(self.test_event_data)
- self.assertEqual(5, len(fake_producer.send.mock_calls))
-
- with mock.patch.object(publisher, '_producer') as fake_producer:
- fake_producer.send.side_effect = Exception("test")
- self.assertRaises(msg_publisher.DeliveryFailure,
- publisher.publish_events,
- self.test_event_data)
- self.assertEqual(100, len(fake_producer.send.mock_calls))
- self.assertEqual(0, len(publisher.local_queue))
diff --git a/releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml b/releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml
new file mode 100644
index 00000000..13fcb95b
--- /dev/null
+++ b/releasenotes/notes/remove-kafka-broker-publisher-7026b370cfc831db.yaml
@@ -0,0 +1,4 @@
+---
+upgrade:
+ - |
+ The deprecated kafka publisher has been removed, use NotifierPublisher instead.
diff --git a/setup.cfg b/setup.cfg
index 09d8a63a..c7481368 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -245,7 +245,6 @@ ceilometer.sample.publisher =
udp = ceilometer.publisher.udp:UDPPublisher
file = ceilometer.publisher.file:FilePublisher
direct = ceilometer.publisher.direct:DirectPublisher
- kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher
http = ceilometer.publisher.http:HttpPublisher
https = ceilometer.publisher.http:HttpPublisher
gnocchi = ceilometer.publisher.direct:DirectPublisher
@@ -258,7 +257,6 @@ ceilometer.event.publisher =
test = ceilometer.publisher.test:TestPublisher
direct = ceilometer.publisher.direct:DirectPublisher
notifier = ceilometer.publisher.messaging:EventNotifierPublisher
- kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher
http = ceilometer.publisher.http:HttpPublisher
https = ceilometer.publisher.http:HttpPublisher
gnocchi = ceilometer.publisher.direct:DirectPublisher