summaryrefslogtreecommitdiff
path: root/ceilometer/publisher
diff options
context:
space:
mode:
authorzhang-shaoman <zhang.shaoman@zte.com.cn>2019-05-05 16:25:47 +0800
committerzhang-shaoman <zhang.shaoman@zte.com.cn>2019-05-05 16:25:47 +0800
commitfd000b8c131b3b5d23d2c45233353107536565e1 (patch)
treed166474ffe1e99eda5158a463f7f45713b1c5131 /ceilometer/publisher
parent5ca16ccee7fa0f5221ba3a4b6d9a3942bea72b87 (diff)
downloadceilometer-fd000b8c131b3b5d23d2c45233353107536565e1.tar.gz
metering data lost randomly
Thread lock has been added for flush(), while queue may be modified in other places, which has not been protected, so need to add lock in such places too. Change-Id: I6b46f07690f2fc165fe27bf509ee8926656fd6b9 Closes-Bug: #1827768
Diffstat (limited to 'ceilometer/publisher')
-rw-r--r--ceilometer/publisher/messaging.py11
1 files changed, 8 insertions, 3 deletions
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py
index d6a3ed2a..a4e9f8a3 100644
--- a/ceilometer/publisher/messaging.py
+++ b/ceilometer/publisher/messaging.py
@@ -112,9 +112,11 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
for sample in samples
]
topic = self.conf.publisher_notifier.metering_topic
- self.local_queue.append((topic, meters))
+ with self.queue_lock:
+ self.local_queue.append((topic, meters))
if self.per_meter_topic:
+ queue_per_meter_topic = []
for meter_name, meter_list in itertools.groupby(
sorted(meters, key=operator.itemgetter('counter_name')),
operator.itemgetter('counter_name')):
@@ -122,7 +124,9 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
topic_name = topic + '.' + meter_name
LOG.debug('Publishing %(m)d samples on %(n)s',
{'m': len(meter_list), 'n': topic_name})
- self.local_queue.append((topic_name, meter_list))
+ queue_per_meter_topic.append((topic_name, meter_list))
+ with self.queue_lock:
+ self.local_queue.extend(queue_per_meter_topic)
self.flush()
@@ -180,7 +184,8 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
event, self.conf.publisher.telemetry_secret) for event in events]
topic = self.conf.publisher_notifier.event_topic
- self.local_queue.append((topic, ev_list))
+ with self.queue_lock:
+ self.local_queue.append((topic, ev_list))
self.flush()
@abc.abstractmethod