summaryrefslogtreecommitdiff
path: root/cloudinit/reporting/handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/reporting/handlers.py')
-rw-r--r--cloudinit/reporting/handlers.py87
1 files changed, 71 insertions, 16 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index e163e168..d43b80b0 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -10,6 +10,8 @@ import threading
import time
import uuid
from datetime import datetime
+from threading import Event
+from typing import Union
from cloudinit import log as logging
from cloudinit import url_helper, util
@@ -81,34 +83,79 @@ class WebHookHandler(ReportingHandler):
super(WebHookHandler, self).__init__()
if any([consumer_key, token_key, token_secret, consumer_secret]):
- self.oauth_helper = url_helper.OauthUrlHelper(
+ oauth_helper = url_helper.OauthUrlHelper(
consumer_key=consumer_key,
token_key=token_key,
token_secret=token_secret,
consumer_secret=consumer_secret,
)
+ self.readurl = oauth_helper.readurl
else:
- self.oauth_helper = None
+ self.readurl = url_helper.readurl
self.endpoint = endpoint
self.timeout = timeout
self.retries = retries
self.ssl_details = util.fetch_ssl_details()
+ self.flush_requested = Event()
+ self.queue = queue.Queue()
+ self.event_processor = threading.Thread(target=self.process_requests)
+ self.event_processor.daemon = True
+ self.event_processor.start()
+
+ def process_requests(self):
+ consecutive_failed = 0
+ while True:
+ if self.flush_requested.is_set() and consecutive_failed > 2:
+ # At this point the main thread is waiting for the queue to
+ # drain. If we have a queue of events piled up and recent
+ # events have failed, lets not waste time trying to post
+ # the rest, especially since a long timeout could block
+ # cloud-init for quite a long time.
+ LOG.warning(
+ "Multiple consecutive failures in WebHookHandler. "
+ "Cancelling all queued events."
+ )
+ while not self.queue.empty():
+ self.queue.get_nowait()
+ self.queue.task_done()
+ consecutive_failed = 0
+ args = self.queue.get(block=True)
+ try:
+ self.readurl(
+ args[0],
+ data=args[1],
+ timeout=args[2],
+ retries=args[3],
+ ssl_details=args[4],
+ )
+ consecutive_failed = 0
+ except Exception as e:
+ LOG.warning(
+ "Failed posting event: %s. This was caused by: %s",
+ args[1],
+ e,
+ )
+ consecutive_failed += 1
+ finally:
+ self.queue.task_done()
+
def publish_event(self, event):
- if self.oauth_helper:
- readurl = self.oauth_helper.readurl
- else:
- readurl = url_helper.readurl
- try:
- return readurl(
+ self.queue.put(
+ (
self.endpoint,
- data=json.dumps(event.as_dict()),
- timeout=self.timeout,
- retries=self.retries,
- ssl_details=self.ssl_details,
+ json.dumps(event.as_dict()),
+ self.timeout,
+ self.retries,
+ self.ssl_details,
)
- except Exception:
- LOG.warning("failed posting event: %s", event.as_string())
+ )
+
+ def flush(self):
+ self.flush_requested.set()
+ LOG.debug("WebHookHandler flushing remaining events")
+ self.queue.join()
+ self.flush_requested.clear()
class HyperVKvpReportingHandler(ReportingHandler):
@@ -359,10 +406,18 @@ class HyperVKvpReportingHandler(ReportingHandler):
self.q.join()
+# Type[ReportingHandler] doesn't work here because each class has different
+# call args. Protocols in python 3.8 can probably make this simpler.
+HandlerType = Union[
+ ReportingHandler,
+ LogHandler,
+ PrintHandler,
+ WebHookHandler,
+ HyperVKvpReportingHandler,
+]
+
available_handlers = DictRegistry()
available_handlers.register_item("log", LogHandler)
available_handlers.register_item("print", PrintHandler)
available_handlers.register_item("webhook", WebHookHandler)
available_handlers.register_item("hyperv", HyperVKvpReportingHandler)
-
-# vi: ts=4 expandtab