diff options
author | Graham Dumpleton <Graham.Dumpleton@gmail.com> | 2014-06-08 17:44:42 +1000 |
---|---|---|
committer | Graham Dumpleton <Graham.Dumpleton@gmail.com> | 2014-06-08 17:44:42 +1000 |
commit | 64bfa70c34d0e09e5135fe8e407d636e2f6bc625 (patch) | |
tree | 75b44ecb6dbe6bf9dbae9b8a92568d0188796ba7 | |
parent | a20bb40fce1085c25212c7e0f38af8afcb4a0eea (diff) | |
download | mod_wsgi-metrics-64bfa70c34d0e09e5135fe8e407d636e2f6bc625.tar.gz |
Initial version of code.
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | README.rst | 129 | ||||
-rw-r--r-- | setup.py | 37 | ||||
-rw-r--r-- | src/__init__.py | 2 | ||||
-rw-r--r-- | src/metrics/__init__.py | 0 | ||||
-rw-r--r-- | src/metrics/newrelic/__init__.py | 1 | ||||
-rw-r--r-- | src/metrics/newrelic/agent.py | 99 | ||||
-rw-r--r-- | src/metrics/newrelic/interface.py | 189 | ||||
-rw-r--r-- | src/metrics/newrelic/sampler.py | 604 |
9 files changed, 1061 insertions, 4 deletions
diff --git a/README.md b/README.md deleted file mode 100644 index 68215e1..0000000 --- a/README.md +++ /dev/null @@ -1,4 +0,0 @@ -mod_wsgi-metrics -================ - -Metrics package for Apache/mod_wsgi. diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..490a7e6 --- /dev/null +++ b/README.rst @@ -0,0 +1,129 @@ +================== +MOD_WSGI (METRICS) +================== + +The mod_wsgi-metrics package is an add on package for Apache/mod_wsgi. It +generates metric information about the run time performance of Apache and +mod_wsgi. At least mod_wsgi version 4.2.0 is required. + +In this version, metrics collected cover the performance of the Apache web +server as a whole. In future versions additional metrics will be added +which monitor aspects of mod_wsgi itself. + +At the present time the package provides a plugin for the +`New Relic Platform <http://www.newrelic.com/platform>`_. This plugin is +distinct from New Relic's own Python agent for use in monitoring Python web +applications. The plugin instead focuses on metrics specific to Apache and +mod_wsgi. The information from these metrics can be used to help in tuning +your Apache/mod_wsgi installation for best performance. + +The New Relic Platform is a free feature of New Relic and so in order to +use this plugin for Apache/mod_wsgi, you do not need to have a paid account +for New Relic. + +Even if using the Python agent for New Relic, New Relic provides a free +Lite tier for it, so there is no excuse for not using both the Python agent +and this plugin to give you that extra visibility. Learn about what your +Python web application is really doing. [1]_ + +Using the New Relic plugin with a mod_wsgi express installation +--------------------------------------------------------------- + +When using `mod_wsgi express <https://pypi.python.org/pypi/mod_wsgi>`_, +the plugin will be automatically started and will report data when using +the builtin support of mod_wsgi express for New Relic. See the mod_wsgi +express documentation for more information on starting it with New Relic +support enabled. + +Using New Relic plugin with a standard mod_wsgi installation +------------------------------------------------------------ + +If you have installed mod_wsgi as an Apache module direct into your Apache +installation, or have installed an operating system binary package, and are +configuring Apache manually to host your Python web application, additional +setup will be required to enable the plugin. + +The steps for manually enabling the plugin are as follows: + +1. Create a Python script file called ``server-metrics.py``. In that file +place:: + + import logging + + logging.basicConfig(level=logging.INFO, + format='%(name)s (pid=%(process)d, level=%(levelname)s): %(message)s') + + from mod_wsgi.metrics.newrelic import Agent + + config_file = '/some/path/newrelic.ini' + + agent = Agent(config_file=config_file) + agent.start() + +This would normally be placed along side your Python web application code. + +The ``config_file`` variable should be set to the location of the +``newrelic.ini`` agent configuration file you created for use with the New +Relic Python agent. + +Alternatively, you can set the New Relic license key and application name +to report to within the Python script file:: + + license_key = 'YOUR-NEW-RELIC-LICENSE-KEY' + app_name = 'THE-APPLICATION-NAME-TO-REPORT-AGAINST' + + agent = Agent(app_name=app_name, license_key=license_key) + agent.start() + +This Python script file would normally be placed along side your Python web +application code. + +2. Ensure that the ``mod_status`` module is being loaded into Apache and that +``ExtendedStatus`` is ``On``:: + + LoadModule status_module modules/mod_status.so + ExtendedStatus On + +The exact way in which this needs to be done will differ between Apache +installations, especially with Apache installations provided by a Linux +distribution. You should therefore look closely at how this is managed +for your Apache installation. + +Note that it is only necessary to load ``mod_status`` and enable +``ExtendedStatus``. It is not necessary to expose the traditional +``/server-status`` URL generally associated with the use of ``mod_status`` +as the plugin will not use that. Instead the plugin obtains the information +from the ``mod_wsgi`` module. The ``mod_status`` module still has to be +loaded though, otherwise Apache will not collect the information that is +required. + +3. Create a dedicated mod_wsgi daemon process group using the +``WSGIDaemonProcess`` directive. This should have only a single process and +a single thread. It should also enable visibility of internal server +metrics from mod_wsgi using the ``server-metrics`` option:: + + WSGIDaemonProcess newrelic display-name=%{GROUP} \ + processes=1 threads=1 server-metrics=On + +This daemon process group should not be used to host your actual Python +web application. + +4. Specify that the ``server-metrics.py`` Python script file you created +should be loaded when Apache is (re)started using the ``WSGIImportScript`` +directive:: + + WSGIImportScript /some/path/server-metrics.py \ + process-group=newrelic application-group=%{GLOBAL} + +The path should match where you saved the ``server-metrics.py`` script. +The ``process-group`` option should match the name of the daemon process +group created with using the ``WSGIDaemonProcess`` directive. + +4. Restart Apache. Within the New Relic UI you should automatically see +a new entry appear in the left hand navigation bar labelled 'mod_wsgi'. The +reported data will then appear under the application name used. + +.. [1] Disclaimer: I work for New Relic and am the primary developer of + the Python agent. So of course it is awesome. The work I do on + this plugin for the New Relic platform is independent of any work + I do for New Relic and is done on my own time though. :-) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8be306f --- /dev/null +++ b/setup.py @@ -0,0 +1,37 @@ +from __future__ import print_function + +from setuptools import setup + +setup_kwargs = dict( + name = 'mod_wsgi-metrics', + version = '1.0.0', + description = 'Metrics package for Apache/mod_wsgi.', + author = 'Graham Dumpleton', + author_email = 'Graham.Dumpleton@gmail.com', + maintainer = 'Graham Dumpleton', + maintainer_email = 'Graham.Dumpleton@gmail.com', + url = 'http://www.modwsgi.org/', + license = 'Apache License, Version 2.0', + platforms = [], + download_url = None, + classifiers= [ + 'Development Status :: 6 - Mature', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: MacOS :: MacOS X', + 'Operating System :: POSIX', + 'Operating System :: POSIX :: BSD', + 'Operating System :: POSIX :: Linux', + 'Operating System :: POSIX :: SunOS/Solaris', + 'Programming Language :: Python', + 'Programming Language :: Python :: Implementation :: CPython', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Topic :: Internet :: WWW/HTTP :: WSGI', + ], + packages = ['mod_wsgi', 'mod_wsgi.metrics', 'mod_wsgi.metrics.newrelic'], + package_dir = {'mod_wsgi': 'src'}, +) + +setup(**setup_kwargs) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..bb61062 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,2 @@ +import pkgutil +__path__ = pkgutil.extend_path(__path__, __name__) diff --git a/src/metrics/__init__.py b/src/metrics/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/metrics/__init__.py diff --git a/src/metrics/newrelic/__init__.py b/src/metrics/newrelic/__init__.py new file mode 100644 index 0000000..d2361b7 --- /dev/null +++ b/src/metrics/newrelic/__init__.py @@ -0,0 +1 @@ +from .agent import Agent diff --git a/src/metrics/newrelic/agent.py b/src/metrics/newrelic/agent.py new file mode 100644 index 0000000..2b4b74c --- /dev/null +++ b/src/metrics/newrelic/agent.py @@ -0,0 +1,99 @@ +import os +import logging + +try: + from ConfigParser import RawConfigParser, NoOptionError, NoSectionError +except ImportError: + from configparser import RawConfigParser, NoOptionError, NoSectionError + +import mod_wsgi + +from .interface import Interface +from .sampler import Sampler + +_logger = logging.getLogger(__name__) + +class Agent(object): + + def __init__(self, app_name=None, license_key=None, config_file=None, + environment=None): + + self.sampler = None + + if mod_wsgi.version < (4, 2, 0): + _logger.fatal('Version 4.2.0 or newer of mod_wsgi is required ' + 'for running the New Relic platform plugin. The plugin ' + 'has been disabled.') + + return + + if config_file is None: + config_file = os.environ.get('NEW_RELIC_CONFIG_FILE', None) + + if config_file is not None: + config_object = RawConfigParser() + + if config_file: + config_object.read([config_file]) + + if environment is None: + environment = os.environ.get('NEW_RELIC_ENVIRONMENT', None) + + def _option(name, section='newrelic', type=None, **kwargs): + try: + getter = 'get%s' % (type or '') + return getattr(config_object, getter)(section, name) + except NoOptionError: + if 'default' in kwargs: + return kwargs['default'] + else: + raise + + def option(name, type=None, **kwargs): + sections = [] + + if environment is not None: + sections.append('newrelic-platform:%s' % environment) + + sections.append('newrelic-platform') + + if environment is not None: + sections.append('newrelic:%s' % environment) + + sections.append('newrelic') + + for section in sections: + try: + return _option(name, section, type) + except (NoOptionError, NoSectionError): + pass + + if 'default' in kwargs: + return kwargs['default'] + + if app_name is None: + app_name = os.environ.get('NEW_RELIC_APP_NAME', None) + app_name = option('app_name', default=app_name) + + if license_key is None: + license_key = os.environ.get('NEW_RELIC_LICENSE_KEY', None) + license_key = option('license_key', default=license_key) + + if app_name is not None: + app_name = app_name.split(';')[0].strip() + + if not license_key or not app_name: + _logger.fatal('Either the license key or application name was ' + 'not specified for the New Relic platform plugin. The ' + 'plugin has been disabled.') + + return + + _logger.info('New Relic platform plugin reporting to %r.', app_name) + + self.interface = Interface(license_key) + self.sampler = Sampler(self.interface, app_name) + + def start(self): + if self.sampler is not None: + self.sampler.start() diff --git a/src/metrics/newrelic/interface.py b/src/metrics/newrelic/interface.py new file mode 100644 index 0000000..a9db15f --- /dev/null +++ b/src/metrics/newrelic/interface.py @@ -0,0 +1,189 @@ +import zlib +import sys +import socket +import os +import types +import json +import logging + +try: + import http.client as httplib +except ImportError: + import httplib + +_logger = logging.getLogger(__name__) + +# Python 3 compatibility helpers. + +PY2 = sys.version_info[0] == 2 +PY3 = sys.version_info[0] == 3 + +if PY3: + def b(s): + return s.encode('latin-1') +else: + def b(s): + return s + +# Helpers for json encoding and decoding. + +def json_encode(obj, **kwargs): + _kwargs = {} + + if type(b'') is type(''): + _kwargs['encoding'] = 'latin-1' + + def _encode(o): + if isinstance(o, bytes): + return o.decode('latin-1') + elif isinstance(o, types.GeneratorType): + return list(o) + elif hasattr(o, '__iter__'): + return list(iter(o)) + raise TypeError(repr(o) + ' is not JSON serializable') + + _kwargs['default'] = _encode + _kwargs['separators'] = (',', ':') + + _kwargs.update(kwargs) + + return json.dumps(obj, **_kwargs) + +def json_decode(s, **kwargs): + return json.loads(s, **kwargs) + +# Platform plugin interface. + +class Interface(object): + + class NetworkInterfaceException(Exception): pass + class DiscardDataForRequest(NetworkInterfaceException): pass + class RetryDataForRequest(NetworkInterfaceException): pass + class ServerIsUnavailable(RetryDataForRequest): pass + + USER_AGENT = 'ModWsgi-PythonPlugin/%s (Python %s %s)' % ( + '1.0.0', sys.version.split()[0], sys.platform) + + HOST = 'platform-api.newrelic.com' + URL = '/platform/v1/metrics' + + def __init__(self, license_key): + self.license_key = license_key + + def send_request(self, payload=()): + headers = {} + config = {} + + license_key = self.license_key + + if not self.license_key: + license_key = 'INVALID LICENSE KEY' + + headers['User-Agent'] = self.USER_AGENT + headers['Content-Encoding'] = 'identity' + headers['X-License-Key'] = license_key + + try: + data = json_encode(payload) + + except Exception as exc: + _logger.exception('Error encoding data for JSON payload ' + 'with payload of %r.', payload) + + raise Interface.DiscardDataForRequest(str(exc)) + + if len(data) > 64*1024: + headers['Content-Encoding'] = 'deflate' + level = (len(data) < 2000000) and 1 or 9 + data = zlib.compress(b(data), level) + + try: + connection = httplib.HTTPSConnection(self.HOST, timeout=30.0) + connection.request('POST', self.URL, data, headers) + response = connection.getresponse() + content = response.read() + + except httplib.HTTPException as exc: + raise Interface.RetryDataForRequest(str(exc)) + + finally: + connection.close() + + if response.status != 200: + _logger.debug('Received a non 200 HTTP response from the data ' + 'collector where headers=%r, status=%r and content=%r.', + headers, response.status, content) + + if response.status == 400: + if headers['Content-Encoding'] == 'deflate': + data = zlib.decompress(data) + + _logger.error('Data collector is indicating that a bad ' + 'request has been submitted for headers of %r and ' + 'payload of %r with response of %r.', headers, data, + content) + + raise Interface.DiscardDataForRequest() + + elif response.status == 403: + _logger.error('Data collector is indicating that the license ' + 'key %r is not valid.', license_key) + + raise Interface.DiscardDataForRequest() + + elif response.status == 413: + _logger.warning('Data collector is indicating that a request ' + 'was received where the request content size was over ' + 'the maximum allowed size limit. The length of the ' + 'request content was %d.', len(data)) + + raise Interface.DiscardDataForRequest() + + elif response.status in (503, 504): + _logger.warning('Data collector is unavailable.') + + raise Interface.ServerIsUnavailable() + + elif response.status != 200: + _logger.warning('An unexpected HTTP response was received ' + 'from the data collector of %r. The payload for ' + 'the request was %r.', respnse.status, payload) + + raise Interface.DiscardDataForRequest() + + try: + if PY3: + content = content.decode('UTF-8') + + result = json_decode(content) + + except Exception as exc: + _logger.exception('Error decoding data for JSON payload ' + 'with payload of %r.', content) + + raise Interface.DiscardDataForRequest(str(exc)) + + if 'status' in result: + return result['status'] + + error_message = result['error'] + + raise Interface.DiscardDataForRequest(error_message) + + def send_metrics(self, name, guid, version, duration, metrics): + agent = {} + agent['host'] = socket.gethostname() + agent['pid'] = os.getpid() + agent['version'] = version or '0.0.0.' + + component = {} + component['name'] = name + component['guid'] = guid + component['duration'] = duration + component['metrics'] = metrics + + payload = {} + payload['agent'] = agent + payload['components'] = [component] + + return self.send_request(payload) diff --git a/src/metrics/newrelic/sampler.py b/src/metrics/newrelic/sampler.py new file mode 100644 index 0000000..ab2e767 --- /dev/null +++ b/src/metrics/newrelic/sampler.py @@ -0,0 +1,604 @@ +import threading +import atexit +import os +import sys +import json +import socket +import time +import math + +try: + import Queue as queue +except ImportError: + import queue + +import mod_wsgi + +SERVER_READY = '_' +SERVER_STARTING = 'S' +SERVER_BUSY_READ = 'R' +SERVER_BUSY_WRITE = 'W' +SERVER_BUST_KEEPALIVE = 'K' +SERVER_BUSY_LOG = 'L' +SERVER_BUSY_DNS = 'D' +SERVER_CLOSING = 'C' +SERVER_GRACEFUL = 'G' +SERVER_IDLE_KILL = 'I' +SERVER_DEAD = '.' + +STATUS_FLAGS = { + SERVER_READY: 'Ready', + SERVER_STARTING: 'Starting', + SERVER_BUSY_READ: 'Read', + SERVER_BUSY_WRITE: 'Write', + SERVER_BUST_KEEPALIVE: 'Keepalive', + SERVER_BUSY_LOG: 'Logging', + SERVER_BUSY_DNS: 'DNS lookup', + SERVER_CLOSING: 'Closing', + SERVER_GRACEFUL: 'Graceful', + SERVER_IDLE_KILL: 'Dying', + SERVER_DEAD: 'Dead' +} + +class Sample(dict): + + def __init__(self, count=0, total=0.0, min=0.0, max=0.0, + sum_of_squares=0.0): + self.count = count + self.total = total + self.min = min + self.max = max + self.sum_of_squares = sum_of_squares + + def __setattr__(self, name, value): + self[name] = value + + def __getattr__(self, name): + return self[name] + + def merge_stats(self, other): + self.total += other.total + self.min = self.count and min(self.min, other.min) or other.min + self.max = max(self.max, other.max) + self.sum_of_squares += other.sum_of_squares + self.count += other.count + + def merge_value(self, value): + self.total += value + self.min = self.count and min(self.min, value) or value + self.max = max(self.max, value) + self.sum_of_squares += value ** 2 + self.count += 1 + +class Samples(object): + + def __init__(self): + self.samples = {} + + def __iter__(self): + return iter(self.samples.items()) + + def sample_name(self, name): + return 'Component/' + name + + def _assign_value(self, value): + if isinstance(value, Sample): + sample = value + self.samples[name] = sample + else: + sample = Sample() + self.samples[name] = sample + sample.merge_value(value) + + return sample + + def assign_value(self, value): + name = self.sample_name(name) + + return self._assign_value(name) + + def _merge_value(self, name, value): + sample = self.samples.get(name) + + if sample is None: + sample = Sample() + self.samples[name] = sample + + if isinstance(value, Sample): + sample.merge_stats(value) + else: + sample.merge_value(value) + + return sample + + def merge_value(self, name, value): + name = self.sample_name(name) + + return self._merge_value(name, value) + + def fetch_sample(self, name): + name = self.sample_name(name) + + sample = self.samples.get(name) + + if sample is None: + sample = Sample() + self.samples[name] = sample + + return sample + + def merge_samples(self, samples): + for name, sample in samples: + self._merge_value(name, sample) + + def assign_samples(self, samples): + for name, sample in samples: + self._assign_value(name, sample) + + def clear_samples(self): + self.samples.clear() + +class Sampler(object): + + guid = 'au.com.dscpl.wsgi.mod_wsgi' + version = '1.0.0' + + def __init__(self, interface, name): + self.interface = interface + self.name = name + + self.running = False + self.lock = threading.Lock() + + self.period_start = 0 + self.access_count = 0 + self.bytes_served = 0 + + self.request_samples = [] + + self.metric_data = Samples() + + self.report_queue = queue.Queue() + + self.report_thread = threading.Thread(target=self.report_main_loop) + self.report_thread.setDaemon(True) + + self.report_start = 0 + self.report_metrics = Samples() + + self.monitor_queue = queue.Queue() + + self.monitor_thread = threading.Thread(target=self.monitor_main_loop) + self.monitor_thread.setDaemon(True) + + self.monitor_count = 0 + + def upload_report(self, start, end, metrics): + try: + self.interface.send_metrics(self.name, self.guid, self.version, + end-start, metrics.samples) + + except self.interface.RetryDataForRequest: + return True + + except Exception: + pass + + return False + + def generate_request_metrics(self, harvest_data): + metrics = Samples() + + # Chart as 'Throughput'. + + metrics.merge_value('Requests/Throughput[|requests]', + Sample(count=harvest_data['access_count'], + total=harvest_data['access_count'])) + + # Calculate from the set of sampled requests the average + # and percentile metrics. + + requests = harvest_data['request_samples'] + + if requests: + for request in requests: + # Chart as 'Average'. + + metrics.merge_value('Requests/Response Time[seconds|request]', + request['duration']) + + requests.sort(key=lambda e: e['duration']) + + total = sum([x['duration'] for x in requests]) + + # Chart as 'Average'. + + metrics.merge_value('Requests/Percentiles/Average[seconds]', + total/len(requests)) + + idx50 = int(0.50 * len(requests)) + metrics.merge_value('Requests/Percentiles/Median[seconds]', + requests[idx50]['duration']) + + idx95 = int(0.95 * len(requests)) + metrics.merge_value('Requests/Percentiles/95%[seconds]', + requests[idx95]['duration']) + + idx99 = int(0.99 * len(requests)) + metrics.merge_value('Requests/Percentiles/99%[seconds]', + requests[idx99]['duration']) + + # Chart as 'Rate'. + + metrics.merge_value('Requests/Bytes Served[bytes]', + harvest_data['bytes_served']) + + return metrics + + def generate_process_metrics(self, harvest_data): + metrics = Samples() + + # Chart as 'Count'. Round to Integer. + + metrics.merge_value('Processes/Instances[|processes]', + Sample(count=math.ceil(float( + harvest_data['processes_running']) / + harvest_data['sample_count']))) + + metrics.merge_value('Processes/Lifecycle/Starting[|processes]', + Sample(count=harvest_data['processes_started'])) + + metrics.merge_value('Processes/Lifecycle/Stopping[|processes]', + Sample(count=harvest_data['processes_stopped'])) + + metrics.merge_value('Workers/Availability/Idle[|workers]', + Sample(count=math.ceil(float( + harvest_data['idle_workers']) / + harvest_data['sample_count']))) + metrics.merge_value('Workers/Availability/Busy[|workers]', + Sample(count=math.ceil(float( + harvest_data['busy_workers']) / + harvest_data['sample_count']))) + + # Chart as 'Percentage'. + + metrics.merge_value('Workers/Utilization[server]', + (float(harvest_data['busy_workers']) / + harvest_data['sample_count']) / ( + harvest_data['server_limit']*harvest_data['thread_limit'])) + + total = 0 + for value in harvest_data['worker_status'].values(): + value = float(value)/harvest_data['sample_count'] + total += value + + if total: + for key, value in harvest_data['worker_status'].items(): + if key != SERVER_DEAD and value != 0: + label = STATUS_FLAGS.get(key, 'Unknown') + + # Chart as 'Average'. Round to Integer. + + value = float(value)/harvest_data['sample_count'] + + metrics.merge_value('Workers/Status/%s[workers]' % + label, (value/total)*total) + + return metrics + + def report_main_loop(self): + # We need a set of cached metrics for the case where + # we fail in uploading the metric data and need to + # retain it for the next attempt to upload data. + + retries = 0 + retained_start = 0 + retained = Samples() + + # We simply wait to be passed the metric data to be + # reported for the current sample period. + + while True: + harvest_data = self.report_queue.get() + + # If samples is None then we are being told to + # exit as the process is being shutdown. Otherwise + # we should be passed the cumulative metric data + # and the set of sampled requests. + + if harvest_data is None: + return + + start = harvest_data['period_start'] + end = harvest_data['period_end'] + + metrics = harvest_data['metrics'] + + # Add metric to track how many Apache server instances + # are reporting for each sample period. + + # Chart as 'Count'. Round to Integer. + + metrics.merge_value('Server/Instances[|servers]', 0) + + # Generate percentiles metrics for request samples. + + metrics.merge_samples(self.generate_request_metrics(harvest_data)) + metrics.merge_samples(self.generate_process_metrics(harvest_data)) + + # If we had metrics from a previous reporting period + # because we couldn't upload the metric data, we need + # to merge the data from the current reporting period + # with that for the previous period. + + if retained.samples: + start = retained_start + retained.merge_samples(metrics) + metrics = retained + + # Now attempt to upload the metric data. + + retry = self.upload_report(start, end, metrics) + + # If a failure occurred but failure type was such that we + # could try again to upload the data, then retain them. If + # have two many failed attempts though we give up. + + if retry: + retries += 1 + + if retries == 5: + retries = 0 + + else: + retained = metrics + + else: + retries = 0 + + if retries == 0: + retained_start = 0 + retained.clear_samples() + + else: + retained_start = start + retained = metrics + + def generate_scoreboard(self, sample_start=None): + busy_workers = 0 + idle_workers = 0 + access_count = 0 + bytes_served = 0 + + active_processes = 0 + + scoreboard = mod_wsgi.server_metrics() + + if sample_start is None: + sample_start = scoreboard['current_time'] + + scoreboard['request_samples'] = request_samples = [] + + for process in scoreboard['processes']: + process['active_workers'] = 0 + + for worker in process['workers']: + status = worker['status'] + + if not process['quiescing'] and process['pid']: + if (status == SERVER_READY and process['generation'] == + scoreboard['running_generation']): + + process['active_workers'] += 1 + idle_workers += 1 + + elif status not in (SERVER_DEAD, SERVER_STARTING, + SERVER_IDLE_KILL): + + process['active_workers'] += 1 + busy_workers += 1 + + count = worker['access_count'] + + if count or status not in (SERVER_READY, SERVER_DEAD): + access_count += count + bytes_served += worker['bytes_served'] + + current_time = scoreboard['current_time'] + + start_time = worker['start_time'] + stop_time = worker['stop_time'] + + if (stop_time > start_time and sample_start < stop_time + and stop_time <= current_time): + + duration = stop_time - start_time + thread_num = worker['thread_num'] + + request_samples.append(dict(start_time=start_time, + duration=duration, thread_num=thread_num)) + + if process['active_workers']: + active_processes += 1 + + scoreboard['busy_workers'] = busy_workers + scoreboard['idle_workers'] = idle_workers + scoreboard['access_count'] = access_count + scoreboard['bytes_served'] = bytes_served + + scoreboard['active_processes'] = active_processes + + return scoreboard + + def record_process_statistics(self, scoreboard, harvest_data): + current_active_processes = scoreboard['active_processes'] + previous_active_processes = harvest_data['active_processes'] + + harvest_data['active_processes'] = current_active_processes + harvest_data['processes_running'] += current_active_processes + + if current_active_processes > previous_active_processes: + harvest_data['processes_started'] += (current_active_processes - + previous_active_processes) + + elif current_active_processes < previous_active_processes: + harvest_data['processes_stopped'] += (previous_active_processes - + current_active_processes) + + harvest_data['idle_workers'] += scoreboard['idle_workers'] + harvest_data['busy_workers'] += scoreboard['busy_workers'] + + for process in scoreboard['processes']: + for worker in process['workers']: + harvest_data['worker_status'][worker['status']] += 1 + + def monitor_main_loop(self): + scoreboard = self.generate_scoreboard() + + harvest_start = scoreboard['current_time'] + sample_start = harvest_start + sample_duration = 0.0 + + access_count = scoreboard['access_count'] + bytes_served = scoreboard['bytes_served'] + + harvest_data = {} + + harvest_data['sample_count'] = 0 + harvest_data['period_start'] = harvest_start + + harvest_data['metrics'] = Samples() + + harvest_data['request_samples'] = [] + + harvest_data['active_processes'] = 0 + + harvest_data['processes_running'] = 0 + harvest_data['processes_started'] = 0 + harvest_data['processes_stopped'] = 0 + + harvest_data['idle_workers'] = 0 + harvest_data['busy_workers'] = 0 + + harvest_data['server_limit'] = scoreboard['server_limit'] + harvest_data['thread_limit'] = scoreboard['thread_limit'] + + harvest_data['worker_status'] = {} + + for status in STATUS_FLAGS.keys(): + harvest_data['worker_status'][status] = 0 + + harvest_data['access_count'] = 0 + harvest_data['bytes_served'] = 0 + + # Chart as 'Count'. Round to Integer. + + harvest_data['metrics'].merge_value('Server/Restarts[|servers]', 0) + + start = time.time() + end = start + 60.0 + + while True: + try: + # We want to collect metrics on a regular second + # interval so we need to align the timeout value. + + now = time.time() + start += 1.0 + timeout = start - now + + return self.monitor_queue.get(timeout=timeout) + + except queue.Empty: + pass + + harvest_data['sample_count'] += 1 + + scoreboard = self.generate_scoreboard(sample_start) + + harvest_end = scoreboard['current_time'] + sample_end = harvest_end + + sample_duration = sample_end - sample_start + + self.record_process_statistics(scoreboard, harvest_data) + + harvest_data['request_samples'].extend( + scoreboard['request_samples']) + + access_count_delta = scoreboard['access_count'] + access_count_delta -= access_count + access_count = scoreboard['access_count'] + + harvest_data['access_count'] += access_count_delta + + bytes_served_delta = scoreboard['bytes_served'] + bytes_served_delta -= bytes_served + bytes_served = scoreboard['bytes_served'] + + harvest_data['bytes_served'] += bytes_served_delta + + now = time.time() + + if now >= end: + harvest_data['period_end'] = harvest_end + + self.report_queue.put(harvest_data) + + harvest_start = harvest_end + end += 60.0 + + _harvest_data = {} + + _harvest_data['sample_count'] = 0 + _harvest_data['period_start'] = harvest_start + + _harvest_data['metrics'] = Samples() + + _harvest_data['request_samples'] = [] + + _harvest_data['active_processes'] = ( + harvest_data['active_processes']) + + _harvest_data['processes_running'] = 0 + _harvest_data['processes_started'] = 0 + _harvest_data['processes_stopped'] = 0 + + _harvest_data['idle_workers'] = 0 + _harvest_data['busy_workers'] = 0 + + _harvest_data['server_limit'] = scoreboard['server_limit'] + _harvest_data['thread_limit'] = scoreboard['thread_limit'] + + _harvest_data['worker_status'] = {} + + for status in STATUS_FLAGS.keys(): + _harvest_data['worker_status'][status] = 0 + + _harvest_data['access_count'] = 0 + _harvest_data['bytes_served'] = 0 + + harvest_data = _harvest_data + + sample_start = sample_end + + def terminate(self): + try: + self.report_queue.put(None) + self.monitor_queue.put(None) + except Exception: + pass + + self.monitor_thread.join() + self.report_thread.join() + + def start(self): + if mod_wsgi.server_metrics() is None: + return + + with self.lock: + if not self.running: + self.running = True + atexit.register(self.terminate) + self.monitor_thread.start() + self.report_thread.start() |