diff options
Diffstat (limited to 'cloudinit/reporting/handlers.py')
-rw-r--r-- | cloudinit/reporting/handlers.py | 87 |
1 files changed, 71 insertions, 16 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index e163e168..d43b80b0 100644 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -10,6 +10,8 @@ import threading import time import uuid from datetime import datetime +from threading import Event +from typing import Union from cloudinit import log as logging from cloudinit import url_helper, util @@ -81,34 +83,79 @@ class WebHookHandler(ReportingHandler): super(WebHookHandler, self).__init__() if any([consumer_key, token_key, token_secret, consumer_secret]): - self.oauth_helper = url_helper.OauthUrlHelper( + oauth_helper = url_helper.OauthUrlHelper( consumer_key=consumer_key, token_key=token_key, token_secret=token_secret, consumer_secret=consumer_secret, ) + self.readurl = oauth_helper.readurl else: - self.oauth_helper = None + self.readurl = url_helper.readurl self.endpoint = endpoint self.timeout = timeout self.retries = retries self.ssl_details = util.fetch_ssl_details() + self.flush_requested = Event() + self.queue = queue.Queue() + self.event_processor = threading.Thread(target=self.process_requests) + self.event_processor.daemon = True + self.event_processor.start() + + def process_requests(self): + consecutive_failed = 0 + while True: + if self.flush_requested.is_set() and consecutive_failed > 2: + # At this point the main thread is waiting for the queue to + # drain. If we have a queue of events piled up and recent + # events have failed, lets not waste time trying to post + # the rest, especially since a long timeout could block + # cloud-init for quite a long time. + LOG.warning( + "Multiple consecutive failures in WebHookHandler. " + "Cancelling all queued events." + ) + while not self.queue.empty(): + self.queue.get_nowait() + self.queue.task_done() + consecutive_failed = 0 + args = self.queue.get(block=True) + try: + self.readurl( + args[0], + data=args[1], + timeout=args[2], + retries=args[3], + ssl_details=args[4], + ) + consecutive_failed = 0 + except Exception as e: + LOG.warning( + "Failed posting event: %s. This was caused by: %s", + args[1], + e, + ) + consecutive_failed += 1 + finally: + self.queue.task_done() + def publish_event(self, event): - if self.oauth_helper: - readurl = self.oauth_helper.readurl - else: - readurl = url_helper.readurl - try: - return readurl( + self.queue.put( + ( self.endpoint, - data=json.dumps(event.as_dict()), - timeout=self.timeout, - retries=self.retries, - ssl_details=self.ssl_details, + json.dumps(event.as_dict()), + self.timeout, + self.retries, + self.ssl_details, ) - except Exception: - LOG.warning("failed posting event: %s", event.as_string()) + ) + + def flush(self): + self.flush_requested.set() + LOG.debug("WebHookHandler flushing remaining events") + self.queue.join() + self.flush_requested.clear() class HyperVKvpReportingHandler(ReportingHandler): @@ -359,10 +406,18 @@ class HyperVKvpReportingHandler(ReportingHandler): self.q.join() +# Type[ReportingHandler] doesn't work here because each class has different +# call args. Protocols in python 3.8 can probably make this simpler. +HandlerType = Union[ + ReportingHandler, + LogHandler, + PrintHandler, + WebHookHandler, + HyperVKvpReportingHandler, +] + available_handlers = DictRegistry() available_handlers.register_item("log", LogHandler) available_handlers.register_item("print", PrintHandler) available_handlers.register_item("webhook", WebHookHandler) available_handlers.register_item("hyperv", HyperVKvpReportingHandler) - -# vi: ts=4 expandtab |