summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2023-01-20 09:54:10 +0000
committerGerrit Code Review <review@openstack.org>2023-01-20 09:54:10 +0000
commit80909b4b24da77b69c210d5505d101aeff74cb3f (patch)
treef127f9b42a921c947d9021508a081c008d94734d
parent8434d14e9fa7107993cf8e603221ab05447baaff (diff)
parented55b5f4a4933e0e5d986b0da23239ebcc7d9f77 (diff)
downloadceilometer-80909b4b24da77b69c210d5505d101aeff74cb3f.tar.gz
Merge "Add TCP publisher"
-rw-r--r--ceilometer/publisher/tcp.py94
-rw-r--r--ceilometer/publisher/utils.py4
-rw-r--r--ceilometer/tests/unit/publisher/test_tcp.py223
-rw-r--r--setup.cfg1
4 files changed, 321 insertions, 1 deletions
diff --git a/ceilometer/publisher/tcp.py b/ceilometer/publisher/tcp.py
new file mode 100644
index 00000000..ef5e802d
--- /dev/null
+++ b/ceilometer/publisher/tcp.py
@@ -0,0 +1,94 @@
+#
+# Copyright 2022 Red Hat, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Publish a sample using a TCP mechanism
+"""
+
+import socket
+
+import msgpack
+from oslo_log import log
+from oslo_utils import netutils
+
+import ceilometer
+from ceilometer.i18n import _
+from ceilometer import publisher
+from ceilometer.publisher import utils
+
+LOG = log.getLogger(__name__)
+
+
+class TCPPublisher(publisher.ConfigPublisherBase):
+ def __init__(self, conf, parsed_url):
+ super(TCPPublisher, self).__init__(conf, parsed_url)
+ self.host, self.port = netutils.parse_host_port(
+ parsed_url.netloc, default_port=4952)
+ addrinfo = None
+ try:
+ addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET6,
+ socket.SOCK_STREAM)[0]
+ except socket.gaierror:
+ try:
+ addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET,
+ socket.SOCK_STREAM)[0]
+ except socket.gaierror:
+ pass
+ if addrinfo:
+ self.addr_family = addrinfo[0]
+ else:
+ LOG.warning(
+ "Cannot resolve host %s, creating AF_INET socket...",
+ self.host)
+ self.addr_family = socket.AF_INET
+ self.create_and_connect()
+
+ def create_and_connect(self):
+ self.socket = socket.socket(self.addr_family,
+ socket.SOCK_STREAM)
+ self.socket.connect((self.host, self.port))
+
+ def publish_samples(self, samples):
+ """Send a metering message for publishing
+
+ :param samples: Samples from pipeline after transformation
+ """
+
+ for sample in samples:
+ msg = utils.meter_message_from_counter(
+ sample, self.conf.publisher.telemetry_secret, self.conf.host)
+ host = self.host
+ port = self.port
+ LOG.debug("Publishing sample %(msg)s over TCP to "
+ "%(host)s:%(port)d", {'msg': msg, 'host': host,
+ 'port': port})
+ encoded_msg = msgpack.dumps(msg, use_bin_type=True)
+ msg_len = len(encoded_msg).to_bytes(8, 'little')
+ try:
+ self.socket.send(msg_len + encoded_msg)
+ except Exception:
+ LOG.warning(_("Unable to send sample over TCP,"
+ "trying to reconnect and resend the message"))
+ self.create_and_connect()
+ try:
+ self.socket.send(msg_len + encoded_msg)
+ except Exception:
+ LOG.exception(_("Unable to reconnect and resend"
+ "sample over TCP"))
+
+ def publish_events(self, events):
+ """Send an event message for publishing
+
+ :param events: events from pipeline after transformation
+ """
+ raise ceilometer.NotImplementedError
diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py
index 0d1e7be0..58d799a3 100644
--- a/ceilometer/publisher/utils.py
+++ b/ceilometer/publisher/utils.py
@@ -114,7 +114,7 @@ def verify_signature(message, secret):
return secretutils.constant_time_compare(new_sig, old_sig)
-def meter_message_from_counter(sample, secret):
+def meter_message_from_counter(sample, secret, publisher_id=None):
"""Make a metering message ready to be published or stored.
Returns a dictionary containing a metering message
@@ -135,6 +135,8 @@ def meter_message_from_counter(sample, secret):
'message_id': sample.id,
'monotonic_time': sample.monotonic_time,
}
+ if publisher_id is not None:
+ msg['publisher_id'] = publisher_id
msg['message_signature'] = compute_signature(msg, secret)
return msg
diff --git a/ceilometer/tests/unit/publisher/test_tcp.py b/ceilometer/tests/unit/publisher/test_tcp.py
new file mode 100644
index 00000000..737d6fb3
--- /dev/null
+++ b/ceilometer/tests/unit/publisher/test_tcp.py
@@ -0,0 +1,223 @@
+#
+# Copyright 2022 Red Hat, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Tests for ceilometer/publisher/tcp.py"""
+
+import datetime
+from unittest import mock
+
+import msgpack
+from oslo_utils import netutils
+from oslotest import base
+
+from ceilometer.publisher import tcp
+from ceilometer.publisher import utils
+from ceilometer import sample
+from ceilometer import service
+
+
+COUNTER_SOURCE = 'testsource'
+
+
+class TestTCPPublisher(base.BaseTestCase):
+ test_data = [
+ sample.Sample(
+ name='test',
+ type=sample.TYPE_CUMULATIVE,
+ unit='',
+ volume=1,
+ user_id='test',
+ project_id='test',
+ resource_id='test_run_tasks',
+ timestamp=datetime.datetime.utcnow().isoformat(),
+ resource_metadata={'name': 'TestPublish'},
+ source=COUNTER_SOURCE,
+ ),
+ sample.Sample(
+ name='test',
+ type=sample.TYPE_CUMULATIVE,
+ unit='',
+ volume=1,
+ user_id='test',
+ project_id='test',
+ resource_id='test_run_tasks',
+ timestamp=datetime.datetime.utcnow().isoformat(),
+ resource_metadata={'name': 'TestPublish'},
+ source=COUNTER_SOURCE,
+ ),
+ sample.Sample(
+ name='test2',
+ type=sample.TYPE_CUMULATIVE,
+ unit='',
+ volume=1,
+ user_id='test',
+ project_id='test',
+ resource_id='test_run_tasks',
+ timestamp=datetime.datetime.utcnow().isoformat(),
+ resource_metadata={'name': 'TestPublish'},
+ source=COUNTER_SOURCE,
+ ),
+ sample.Sample(
+ name='test2',
+ type=sample.TYPE_CUMULATIVE,
+ unit='',
+ volume=1,
+ user_id='test',
+ project_id='test',
+ resource_id='test_run_tasks',
+ timestamp=datetime.datetime.utcnow().isoformat(),
+ resource_metadata={'name': 'TestPublish'},
+ source=COUNTER_SOURCE,
+ ),
+ sample.Sample(
+ name='test3',
+ type=sample.TYPE_CUMULATIVE,
+ unit='',
+ volume=1,
+ user_id='test',
+ project_id='test',
+ resource_id='test_run_tasks',
+ timestamp=datetime.datetime.utcnow().isoformat(),
+ resource_metadata={'name': 'TestPublish'},
+ source=COUNTER_SOURCE,
+ ),
+ ]
+
+ @staticmethod
+ def _make_fake_socket(published):
+ def _fake_socket_socket(family, type):
+ def record_data(msg):
+ msg_length = int.from_bytes(msg[0:8], "little")
+ published.append(msg[8:msg_length + 8])
+
+ def connect(dst):
+ pass
+
+ tcp_socket = mock.Mock()
+ tcp_socket.send = record_data
+ tcp_socket.connect = connect
+ return tcp_socket
+
+ return _fake_socket_socket
+
+ def setUp(self):
+ super(TestTCPPublisher, self).setUp()
+ self.CONF = service.prepare_service([], [])
+ self.CONF.publisher.telemetry_secret = 'not-so-secret'
+
+ def test_published(self):
+ self.data_sent = []
+ with mock.patch('socket.socket',
+ self._make_fake_socket(self.data_sent)):
+ publisher = tcp.TCPPublisher(
+ self.CONF,
+ netutils.urlsplit('tcp://somehost'))
+ publisher.publish_samples(self.test_data)
+
+ self.assertEqual(5, len(self.data_sent))
+
+ sent_counters = []
+
+ for data in self.data_sent:
+ counter = msgpack.loads(data, raw=False)
+ sent_counters.append(counter)
+
+ # Check that counters are equal
+ def sort_func(counter):
+ return counter['counter_name']
+
+ counters = [utils.meter_message_from_counter(d,
+ "not-so-secret",
+ publisher.conf.host)
+ for d in self.test_data]
+ counters.sort(key=sort_func)
+ sent_counters.sort(key=sort_func)
+ self.assertEqual(counters, sent_counters)
+
+ @staticmethod
+ def _make_disconnecting_socket(published, connections):
+ def _fake_socket_socket(family, type):
+ def record_data(msg):
+ if len(published) == len(connections) - 1:
+ # Raise for every each first send attempt to
+ # trigger a reconnection attempt and send the data
+ # correctly after reconnecting
+ raise IOError
+ msg_length = int.from_bytes(msg[0:8], "little")
+ published.append(msg[8:msg_length + 8])
+
+ def record_connection(dest):
+ connections.append(dest)
+
+ tcp_socket = mock.Mock()
+ tcp_socket.send = record_data
+ tcp_socket.connect = record_connection
+ return tcp_socket
+
+ return _fake_socket_socket
+
+ def test_reconnect(self):
+ self.data_sent = []
+ self.connections = []
+ with mock.patch('socket.socket',
+ self._make_disconnecting_socket(self.data_sent,
+ self.connections)):
+ publisher = tcp.TCPPublisher(
+ self.CONF,
+ netutils.urlsplit('tcp://somehost'))
+ publisher.publish_samples(self.test_data)
+
+ sent_counters = []
+
+ for data in self.data_sent:
+ counter = msgpack.loads(data, raw=False)
+ sent_counters.append(counter)
+
+ for connection in self.connections:
+ # Check destination
+ self.assertEqual(('somehost', 4952), connection)
+ self.assertEqual(len(self.connections) - 1, len(self.data_sent))
+
+ # Check that counters are equal
+ def sort_func(counter):
+ return counter['counter_name']
+
+ counters = [utils.meter_message_from_counter(d,
+ "not-so-secret",
+ publisher.conf.host)
+ for d in self.test_data]
+ counters.sort(key=sort_func)
+ sent_counters.sort(key=sort_func)
+ self.assertEqual(counters, sent_counters)
+
+ @staticmethod
+ def _raise_ioerror(*args):
+ raise IOError
+
+ def _make_broken_socket(self, family, type):
+ def connect(dst):
+ pass
+
+ tcp_socket = mock.Mock()
+ tcp_socket.send = self._raise_ioerror
+ tcp_socket.connect = connect
+ return tcp_socket
+
+ def test_publish_error(self):
+ with mock.patch('socket.socket',
+ self._make_broken_socket):
+ publisher = tcp.TCPPublisher(
+ self.CONF,
+ netutils.urlsplit('tcp://localhost'))
+ publisher.publish_samples(self.test_data)
diff --git a/setup.cfg b/setup.cfg
index 7d829aca..bf934dc0 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -180,6 +180,7 @@ ceilometer.sample.publisher =
test = ceilometer.publisher.test:TestPublisher
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
udp = ceilometer.publisher.udp:UDPPublisher
+ tcp = ceilometer.publisher.tcp:TCPPublisher
file = ceilometer.publisher.file:FilePublisher
http = ceilometer.publisher.http:HttpPublisher
prometheus = ceilometer.publisher.prometheus:PrometheusPublisher