summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGraham Dumpleton <Graham.Dumpleton@gmail.com>2014-06-08 17:44:42 +1000
committerGraham Dumpleton <Graham.Dumpleton@gmail.com>2014-06-08 17:44:42 +1000
commit64bfa70c34d0e09e5135fe8e407d636e2f6bc625 (patch)
tree75b44ecb6dbe6bf9dbae9b8a92568d0188796ba7
parenta20bb40fce1085c25212c7e0f38af8afcb4a0eea (diff)
downloadmod_wsgi-metrics-64bfa70c34d0e09e5135fe8e407d636e2f6bc625.tar.gz
Initial version of code.
-rw-r--r--README.md4
-rw-r--r--README.rst129
-rw-r--r--setup.py37
-rw-r--r--src/__init__.py2
-rw-r--r--src/metrics/__init__.py0
-rw-r--r--src/metrics/newrelic/__init__.py1
-rw-r--r--src/metrics/newrelic/agent.py99
-rw-r--r--src/metrics/newrelic/interface.py189
-rw-r--r--src/metrics/newrelic/sampler.py604
9 files changed, 1061 insertions, 4 deletions
diff --git a/README.md b/README.md
deleted file mode 100644
index 68215e1..0000000
--- a/README.md
+++ /dev/null
@@ -1,4 +0,0 @@
-mod_wsgi-metrics
-================
-
-Metrics package for Apache/mod_wsgi.
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..490a7e6
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,129 @@
+==================
+MOD_WSGI (METRICS)
+==================
+
+The mod_wsgi-metrics package is an add on package for Apache/mod_wsgi. It
+generates metric information about the run time performance of Apache and
+mod_wsgi. At least mod_wsgi version 4.2.0 is required.
+
+In this version, metrics collected cover the performance of the Apache web
+server as a whole. In future versions additional metrics will be added
+which monitor aspects of mod_wsgi itself.
+
+At the present time the package provides a plugin for the
+`New Relic Platform <http://www.newrelic.com/platform>`_. This plugin is
+distinct from New Relic's own Python agent for use in monitoring Python web
+applications. The plugin instead focuses on metrics specific to Apache and
+mod_wsgi. The information from these metrics can be used to help in tuning
+your Apache/mod_wsgi installation for best performance.
+
+The New Relic Platform is a free feature of New Relic and so in order to
+use this plugin for Apache/mod_wsgi, you do not need to have a paid account
+for New Relic.
+
+Even if using the Python agent for New Relic, New Relic provides a free
+Lite tier for it, so there is no excuse for not using both the Python agent
+and this plugin to give you that extra visibility. Learn about what your
+Python web application is really doing. [1]_
+
+Using the New Relic plugin with a mod_wsgi express installation
+---------------------------------------------------------------
+
+When using `mod_wsgi express <https://pypi.python.org/pypi/mod_wsgi>`_,
+the plugin will be automatically started and will report data when using
+the builtin support of mod_wsgi express for New Relic. See the mod_wsgi
+express documentation for more information on starting it with New Relic
+support enabled.
+
+Using New Relic plugin with a standard mod_wsgi installation
+------------------------------------------------------------
+
+If you have installed mod_wsgi as an Apache module direct into your Apache
+installation, or have installed an operating system binary package, and are
+configuring Apache manually to host your Python web application, additional
+setup will be required to enable the plugin.
+
+The steps for manually enabling the plugin are as follows:
+
+1. Create a Python script file called ``server-metrics.py``. In that file
+place::
+
+ import logging
+
+ logging.basicConfig(level=logging.INFO,
+ format='%(name)s (pid=%(process)d, level=%(levelname)s): %(message)s')
+
+ from mod_wsgi.metrics.newrelic import Agent
+
+ config_file = '/some/path/newrelic.ini'
+
+ agent = Agent(config_file=config_file)
+ agent.start()
+
+This would normally be placed along side your Python web application code.
+
+The ``config_file`` variable should be set to the location of the
+``newrelic.ini`` agent configuration file you created for use with the New
+Relic Python agent.
+
+Alternatively, you can set the New Relic license key and application name
+to report to within the Python script file::
+
+ license_key = 'YOUR-NEW-RELIC-LICENSE-KEY'
+ app_name = 'THE-APPLICATION-NAME-TO-REPORT-AGAINST'
+
+ agent = Agent(app_name=app_name, license_key=license_key)
+ agent.start()
+
+This Python script file would normally be placed along side your Python web
+application code.
+
+2. Ensure that the ``mod_status`` module is being loaded into Apache and that
+``ExtendedStatus`` is ``On``::
+
+ LoadModule status_module modules/mod_status.so
+ ExtendedStatus On
+
+The exact way in which this needs to be done will differ between Apache
+installations, especially with Apache installations provided by a Linux
+distribution. You should therefore look closely at how this is managed
+for your Apache installation.
+
+Note that it is only necessary to load ``mod_status`` and enable
+``ExtendedStatus``. It is not necessary to expose the traditional
+``/server-status`` URL generally associated with the use of ``mod_status``
+as the plugin will not use that. Instead the plugin obtains the information
+from the ``mod_wsgi`` module. The ``mod_status`` module still has to be
+loaded though, otherwise Apache will not collect the information that is
+required.
+
+3. Create a dedicated mod_wsgi daemon process group using the
+``WSGIDaemonProcess`` directive. This should have only a single process and
+a single thread. It should also enable visibility of internal server
+metrics from mod_wsgi using the ``server-metrics`` option::
+
+ WSGIDaemonProcess newrelic display-name=%{GROUP} \
+ processes=1 threads=1 server-metrics=On
+
+This daemon process group should not be used to host your actual Python
+web application.
+
+4. Specify that the ``server-metrics.py`` Python script file you created
+should be loaded when Apache is (re)started using the ``WSGIImportScript``
+directive::
+
+ WSGIImportScript /some/path/server-metrics.py \
+ process-group=newrelic application-group=%{GLOBAL}
+
+The path should match where you saved the ``server-metrics.py`` script.
+The ``process-group`` option should match the name of the daemon process
+group created with using the ``WSGIDaemonProcess`` directive.
+
+4. Restart Apache. Within the New Relic UI you should automatically see
+a new entry appear in the left hand navigation bar labelled 'mod_wsgi'. The
+reported data will then appear under the application name used.
+
+.. [1] Disclaimer: I work for New Relic and am the primary developer of
+ the Python agent. So of course it is awesome. The work I do on
+ this plugin for the New Relic platform is independent of any work
+ I do for New Relic and is done on my own time though. :-)
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..8be306f
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,37 @@
+from __future__ import print_function
+
+from setuptools import setup
+
+setup_kwargs = dict(
+ name = 'mod_wsgi-metrics',
+ version = '1.0.0',
+ description = 'Metrics package for Apache/mod_wsgi.',
+ author = 'Graham Dumpleton',
+ author_email = 'Graham.Dumpleton@gmail.com',
+ maintainer = 'Graham Dumpleton',
+ maintainer_email = 'Graham.Dumpleton@gmail.com',
+ url = 'http://www.modwsgi.org/',
+ license = 'Apache License, Version 2.0',
+ platforms = [],
+ download_url = None,
+ classifiers= [
+ 'Development Status :: 6 - Mature',
+ 'License :: OSI Approved :: Apache Software License',
+ 'Operating System :: MacOS :: MacOS X',
+ 'Operating System :: POSIX',
+ 'Operating System :: POSIX :: BSD',
+ 'Operating System :: POSIX :: Linux',
+ 'Operating System :: POSIX :: SunOS/Solaris',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: Implementation :: CPython',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3.3',
+ 'Programming Language :: Python :: 3.4',
+ 'Topic :: Internet :: WWW/HTTP :: WSGI',
+ ],
+ packages = ['mod_wsgi', 'mod_wsgi.metrics', 'mod_wsgi.metrics.newrelic'],
+ package_dir = {'mod_wsgi': 'src'},
+)
+
+setup(**setup_kwargs)
diff --git a/src/__init__.py b/src/__init__.py
new file mode 100644
index 0000000..bb61062
--- /dev/null
+++ b/src/__init__.py
@@ -0,0 +1,2 @@
+import pkgutil
+__path__ = pkgutil.extend_path(__path__, __name__)
diff --git a/src/metrics/__init__.py b/src/metrics/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/metrics/__init__.py
diff --git a/src/metrics/newrelic/__init__.py b/src/metrics/newrelic/__init__.py
new file mode 100644
index 0000000..d2361b7
--- /dev/null
+++ b/src/metrics/newrelic/__init__.py
@@ -0,0 +1 @@
+from .agent import Agent
diff --git a/src/metrics/newrelic/agent.py b/src/metrics/newrelic/agent.py
new file mode 100644
index 0000000..2b4b74c
--- /dev/null
+++ b/src/metrics/newrelic/agent.py
@@ -0,0 +1,99 @@
+import os
+import logging
+
+try:
+ from ConfigParser import RawConfigParser, NoOptionError, NoSectionError
+except ImportError:
+ from configparser import RawConfigParser, NoOptionError, NoSectionError
+
+import mod_wsgi
+
+from .interface import Interface
+from .sampler import Sampler
+
+_logger = logging.getLogger(__name__)
+
+class Agent(object):
+
+ def __init__(self, app_name=None, license_key=None, config_file=None,
+ environment=None):
+
+ self.sampler = None
+
+ if mod_wsgi.version < (4, 2, 0):
+ _logger.fatal('Version 4.2.0 or newer of mod_wsgi is required '
+ 'for running the New Relic platform plugin. The plugin '
+ 'has been disabled.')
+
+ return
+
+ if config_file is None:
+ config_file = os.environ.get('NEW_RELIC_CONFIG_FILE', None)
+
+ if config_file is not None:
+ config_object = RawConfigParser()
+
+ if config_file:
+ config_object.read([config_file])
+
+ if environment is None:
+ environment = os.environ.get('NEW_RELIC_ENVIRONMENT', None)
+
+ def _option(name, section='newrelic', type=None, **kwargs):
+ try:
+ getter = 'get%s' % (type or '')
+ return getattr(config_object, getter)(section, name)
+ except NoOptionError:
+ if 'default' in kwargs:
+ return kwargs['default']
+ else:
+ raise
+
+ def option(name, type=None, **kwargs):
+ sections = []
+
+ if environment is not None:
+ sections.append('newrelic-platform:%s' % environment)
+
+ sections.append('newrelic-platform')
+
+ if environment is not None:
+ sections.append('newrelic:%s' % environment)
+
+ sections.append('newrelic')
+
+ for section in sections:
+ try:
+ return _option(name, section, type)
+ except (NoOptionError, NoSectionError):
+ pass
+
+ if 'default' in kwargs:
+ return kwargs['default']
+
+ if app_name is None:
+ app_name = os.environ.get('NEW_RELIC_APP_NAME', None)
+ app_name = option('app_name', default=app_name)
+
+ if license_key is None:
+ license_key = os.environ.get('NEW_RELIC_LICENSE_KEY', None)
+ license_key = option('license_key', default=license_key)
+
+ if app_name is not None:
+ app_name = app_name.split(';')[0].strip()
+
+ if not license_key or not app_name:
+ _logger.fatal('Either the license key or application name was '
+ 'not specified for the New Relic platform plugin. The '
+ 'plugin has been disabled.')
+
+ return
+
+ _logger.info('New Relic platform plugin reporting to %r.', app_name)
+
+ self.interface = Interface(license_key)
+ self.sampler = Sampler(self.interface, app_name)
+
+ def start(self):
+ if self.sampler is not None:
+ self.sampler.start()
diff --git a/src/metrics/newrelic/interface.py b/src/metrics/newrelic/interface.py
new file mode 100644
index 0000000..a9db15f
--- /dev/null
+++ b/src/metrics/newrelic/interface.py
@@ -0,0 +1,189 @@
+import zlib
+import sys
+import socket
+import os
+import types
+import json
+import logging
+
+try:
+ import http.client as httplib
+except ImportError:
+ import httplib
+
+_logger = logging.getLogger(__name__)
+
+# Python 3 compatibility helpers.
+
+PY2 = sys.version_info[0] == 2
+PY3 = sys.version_info[0] == 3
+
+if PY3:
+ def b(s):
+ return s.encode('latin-1')
+else:
+ def b(s):
+ return s
+
+# Helpers for json encoding and decoding.
+
+def json_encode(obj, **kwargs):
+ _kwargs = {}
+
+ if type(b'') is type(''):
+ _kwargs['encoding'] = 'latin-1'
+
+ def _encode(o):
+ if isinstance(o, bytes):
+ return o.decode('latin-1')
+ elif isinstance(o, types.GeneratorType):
+ return list(o)
+ elif hasattr(o, '__iter__'):
+ return list(iter(o))
+ raise TypeError(repr(o) + ' is not JSON serializable')
+
+ _kwargs['default'] = _encode
+ _kwargs['separators'] = (',', ':')
+
+ _kwargs.update(kwargs)
+
+ return json.dumps(obj, **_kwargs)
+
+def json_decode(s, **kwargs):
+ return json.loads(s, **kwargs)
+
+# Platform plugin interface.
+
+class Interface(object):
+
+ class NetworkInterfaceException(Exception): pass
+ class DiscardDataForRequest(NetworkInterfaceException): pass
+ class RetryDataForRequest(NetworkInterfaceException): pass
+ class ServerIsUnavailable(RetryDataForRequest): pass
+
+ USER_AGENT = 'ModWsgi-PythonPlugin/%s (Python %s %s)' % (
+ '1.0.0', sys.version.split()[0], sys.platform)
+
+ HOST = 'platform-api.newrelic.com'
+ URL = '/platform/v1/metrics'
+
+ def __init__(self, license_key):
+ self.license_key = license_key
+
+ def send_request(self, payload=()):
+ headers = {}
+ config = {}
+
+ license_key = self.license_key
+
+ if not self.license_key:
+ license_key = 'INVALID LICENSE KEY'
+
+ headers['User-Agent'] = self.USER_AGENT
+ headers['Content-Encoding'] = 'identity'
+ headers['X-License-Key'] = license_key
+
+ try:
+ data = json_encode(payload)
+
+ except Exception as exc:
+ _logger.exception('Error encoding data for JSON payload '
+ 'with payload of %r.', payload)
+
+ raise Interface.DiscardDataForRequest(str(exc))
+
+ if len(data) > 64*1024:
+ headers['Content-Encoding'] = 'deflate'
+ level = (len(data) < 2000000) and 1 or 9
+ data = zlib.compress(b(data), level)
+
+ try:
+ connection = httplib.HTTPSConnection(self.HOST, timeout=30.0)
+ connection.request('POST', self.URL, data, headers)
+ response = connection.getresponse()
+ content = response.read()
+
+ except httplib.HTTPException as exc:
+ raise Interface.RetryDataForRequest(str(exc))
+
+ finally:
+ connection.close()
+
+ if response.status != 200:
+ _logger.debug('Received a non 200 HTTP response from the data '
+ 'collector where headers=%r, status=%r and content=%r.',
+ headers, response.status, content)
+
+ if response.status == 400:
+ if headers['Content-Encoding'] == 'deflate':
+ data = zlib.decompress(data)
+
+ _logger.error('Data collector is indicating that a bad '
+ 'request has been submitted for headers of %r and '
+ 'payload of %r with response of %r.', headers, data,
+ content)
+
+ raise Interface.DiscardDataForRequest()
+
+ elif response.status == 403:
+ _logger.error('Data collector is indicating that the license '
+ 'key %r is not valid.', license_key)
+
+ raise Interface.DiscardDataForRequest()
+
+ elif response.status == 413:
+ _logger.warning('Data collector is indicating that a request '
+ 'was received where the request content size was over '
+ 'the maximum allowed size limit. The length of the '
+ 'request content was %d.', len(data))
+
+ raise Interface.DiscardDataForRequest()
+
+ elif response.status in (503, 504):
+ _logger.warning('Data collector is unavailable.')
+
+ raise Interface.ServerIsUnavailable()
+
+ elif response.status != 200:
+ _logger.warning('An unexpected HTTP response was received '
+ 'from the data collector of %r. The payload for '
+ 'the request was %r.', respnse.status, payload)
+
+ raise Interface.DiscardDataForRequest()
+
+ try:
+ if PY3:
+ content = content.decode('UTF-8')
+
+ result = json_decode(content)
+
+ except Exception as exc:
+ _logger.exception('Error decoding data for JSON payload '
+ 'with payload of %r.', content)
+
+ raise Interface.DiscardDataForRequest(str(exc))
+
+ if 'status' in result:
+ return result['status']
+
+ error_message = result['error']
+
+ raise Interface.DiscardDataForRequest(error_message)
+
+ def send_metrics(self, name, guid, version, duration, metrics):
+ agent = {}
+ agent['host'] = socket.gethostname()
+ agent['pid'] = os.getpid()
+ agent['version'] = version or '0.0.0.'
+
+ component = {}
+ component['name'] = name
+ component['guid'] = guid
+ component['duration'] = duration
+ component['metrics'] = metrics
+
+ payload = {}
+ payload['agent'] = agent
+ payload['components'] = [component]
+
+ return self.send_request(payload)
diff --git a/src/metrics/newrelic/sampler.py b/src/metrics/newrelic/sampler.py
new file mode 100644
index 0000000..ab2e767
--- /dev/null
+++ b/src/metrics/newrelic/sampler.py
@@ -0,0 +1,604 @@
+import threading
+import atexit
+import os
+import sys
+import json
+import socket
+import time
+import math
+
+try:
+ import Queue as queue
+except ImportError:
+ import queue
+
+import mod_wsgi
+
+SERVER_READY = '_'
+SERVER_STARTING = 'S'
+SERVER_BUSY_READ = 'R'
+SERVER_BUSY_WRITE = 'W'
+SERVER_BUST_KEEPALIVE = 'K'
+SERVER_BUSY_LOG = 'L'
+SERVER_BUSY_DNS = 'D'
+SERVER_CLOSING = 'C'
+SERVER_GRACEFUL = 'G'
+SERVER_IDLE_KILL = 'I'
+SERVER_DEAD = '.'
+
+STATUS_FLAGS = {
+ SERVER_READY: 'Ready',
+ SERVER_STARTING: 'Starting',
+ SERVER_BUSY_READ: 'Read',
+ SERVER_BUSY_WRITE: 'Write',
+ SERVER_BUST_KEEPALIVE: 'Keepalive',
+ SERVER_BUSY_LOG: 'Logging',
+ SERVER_BUSY_DNS: 'DNS lookup',
+ SERVER_CLOSING: 'Closing',
+ SERVER_GRACEFUL: 'Graceful',
+ SERVER_IDLE_KILL: 'Dying',
+ SERVER_DEAD: 'Dead'
+}
+
+class Sample(dict):
+
+ def __init__(self, count=0, total=0.0, min=0.0, max=0.0,
+ sum_of_squares=0.0):
+ self.count = count
+ self.total = total
+ self.min = min
+ self.max = max
+ self.sum_of_squares = sum_of_squares
+
+ def __setattr__(self, name, value):
+ self[name] = value
+
+ def __getattr__(self, name):
+ return self[name]
+
+ def merge_stats(self, other):
+ self.total += other.total
+ self.min = self.count and min(self.min, other.min) or other.min
+ self.max = max(self.max, other.max)
+ self.sum_of_squares += other.sum_of_squares
+ self.count += other.count
+
+ def merge_value(self, value):
+ self.total += value
+ self.min = self.count and min(self.min, value) or value
+ self.max = max(self.max, value)
+ self.sum_of_squares += value ** 2
+ self.count += 1
+
+class Samples(object):
+
+ def __init__(self):
+ self.samples = {}
+
+ def __iter__(self):
+ return iter(self.samples.items())
+
+ def sample_name(self, name):
+ return 'Component/' + name
+
+ def _assign_value(self, value):
+ if isinstance(value, Sample):
+ sample = value
+ self.samples[name] = sample
+ else:
+ sample = Sample()
+ self.samples[name] = sample
+ sample.merge_value(value)
+
+ return sample
+
+ def assign_value(self, value):
+ name = self.sample_name(name)
+
+ return self._assign_value(name)
+
+ def _merge_value(self, name, value):
+ sample = self.samples.get(name)
+
+ if sample is None:
+ sample = Sample()
+ self.samples[name] = sample
+
+ if isinstance(value, Sample):
+ sample.merge_stats(value)
+ else:
+ sample.merge_value(value)
+
+ return sample
+
+ def merge_value(self, name, value):
+ name = self.sample_name(name)
+
+ return self._merge_value(name, value)
+
+ def fetch_sample(self, name):
+ name = self.sample_name(name)
+
+ sample = self.samples.get(name)
+
+ if sample is None:
+ sample = Sample()
+ self.samples[name] = sample
+
+ return sample
+
+ def merge_samples(self, samples):
+ for name, sample in samples:
+ self._merge_value(name, sample)
+
+ def assign_samples(self, samples):
+ for name, sample in samples:
+ self._assign_value(name, sample)
+
+ def clear_samples(self):
+ self.samples.clear()
+
+class Sampler(object):
+
+ guid = 'au.com.dscpl.wsgi.mod_wsgi'
+ version = '1.0.0'
+
+ def __init__(self, interface, name):
+ self.interface = interface
+ self.name = name
+
+ self.running = False
+ self.lock = threading.Lock()
+
+ self.period_start = 0
+ self.access_count = 0
+ self.bytes_served = 0
+
+ self.request_samples = []
+
+ self.metric_data = Samples()
+
+ self.report_queue = queue.Queue()
+
+ self.report_thread = threading.Thread(target=self.report_main_loop)
+ self.report_thread.setDaemon(True)
+
+ self.report_start = 0
+ self.report_metrics = Samples()
+
+ self.monitor_queue = queue.Queue()
+
+ self.monitor_thread = threading.Thread(target=self.monitor_main_loop)
+ self.monitor_thread.setDaemon(True)
+
+ self.monitor_count = 0
+
+ def upload_report(self, start, end, metrics):
+ try:
+ self.interface.send_metrics(self.name, self.guid, self.version,
+ end-start, metrics.samples)
+
+ except self.interface.RetryDataForRequest:
+ return True
+
+ except Exception:
+ pass
+
+ return False
+
+ def generate_request_metrics(self, harvest_data):
+ metrics = Samples()
+
+ # Chart as 'Throughput'.
+
+ metrics.merge_value('Requests/Throughput[|requests]',
+ Sample(count=harvest_data['access_count'],
+ total=harvest_data['access_count']))
+
+ # Calculate from the set of sampled requests the average
+ # and percentile metrics.
+
+ requests = harvest_data['request_samples']
+
+ if requests:
+ for request in requests:
+ # Chart as 'Average'.
+
+ metrics.merge_value('Requests/Response Time[seconds|request]',
+ request['duration'])
+
+ requests.sort(key=lambda e: e['duration'])
+
+ total = sum([x['duration'] for x in requests])
+
+ # Chart as 'Average'.
+
+ metrics.merge_value('Requests/Percentiles/Average[seconds]',
+ total/len(requests))
+
+ idx50 = int(0.50 * len(requests))
+ metrics.merge_value('Requests/Percentiles/Median[seconds]',
+ requests[idx50]['duration'])
+
+ idx95 = int(0.95 * len(requests))
+ metrics.merge_value('Requests/Percentiles/95%[seconds]',
+ requests[idx95]['duration'])
+
+ idx99 = int(0.99 * len(requests))
+ metrics.merge_value('Requests/Percentiles/99%[seconds]',
+ requests[idx99]['duration'])
+
+ # Chart as 'Rate'.
+
+ metrics.merge_value('Requests/Bytes Served[bytes]',
+ harvest_data['bytes_served'])
+
+ return metrics
+
+ def generate_process_metrics(self, harvest_data):
+ metrics = Samples()
+
+ # Chart as 'Count'. Round to Integer.
+
+ metrics.merge_value('Processes/Instances[|processes]',
+ Sample(count=math.ceil(float(
+ harvest_data['processes_running']) /
+ harvest_data['sample_count'])))
+
+ metrics.merge_value('Processes/Lifecycle/Starting[|processes]',
+ Sample(count=harvest_data['processes_started']))
+
+ metrics.merge_value('Processes/Lifecycle/Stopping[|processes]',
+ Sample(count=harvest_data['processes_stopped']))
+
+ metrics.merge_value('Workers/Availability/Idle[|workers]',
+ Sample(count=math.ceil(float(
+ harvest_data['idle_workers']) /
+ harvest_data['sample_count'])))
+ metrics.merge_value('Workers/Availability/Busy[|workers]',
+ Sample(count=math.ceil(float(
+ harvest_data['busy_workers']) /
+ harvest_data['sample_count'])))
+
+ # Chart as 'Percentage'.
+
+ metrics.merge_value('Workers/Utilization[server]',
+ (float(harvest_data['busy_workers']) /
+ harvest_data['sample_count']) / (
+ harvest_data['server_limit']*harvest_data['thread_limit']))
+
+ total = 0
+ for value in harvest_data['worker_status'].values():
+ value = float(value)/harvest_data['sample_count']
+ total += value
+
+ if total:
+ for key, value in harvest_data['worker_status'].items():
+ if key != SERVER_DEAD and value != 0:
+ label = STATUS_FLAGS.get(key, 'Unknown')
+
+ # Chart as 'Average'. Round to Integer.
+
+ value = float(value)/harvest_data['sample_count']
+
+ metrics.merge_value('Workers/Status/%s[workers]' %
+ label, (value/total)*total)
+
+ return metrics
+
+ def report_main_loop(self):
+ # We need a set of cached metrics for the case where
+ # we fail in uploading the metric data and need to
+ # retain it for the next attempt to upload data.
+
+ retries = 0
+ retained_start = 0
+ retained = Samples()
+
+ # We simply wait to be passed the metric data to be
+ # reported for the current sample period.
+
+ while True:
+ harvest_data = self.report_queue.get()
+
+ # If samples is None then we are being told to
+ # exit as the process is being shutdown. Otherwise
+ # we should be passed the cumulative metric data
+ # and the set of sampled requests.
+
+ if harvest_data is None:
+ return
+
+ start = harvest_data['period_start']
+ end = harvest_data['period_end']
+
+ metrics = harvest_data['metrics']
+
+ # Add metric to track how many Apache server instances
+ # are reporting for each sample period.
+
+ # Chart as 'Count'. Round to Integer.
+
+ metrics.merge_value('Server/Instances[|servers]', 0)
+
+ # Generate percentiles metrics for request samples.
+
+ metrics.merge_samples(self.generate_request_metrics(harvest_data))
+ metrics.merge_samples(self.generate_process_metrics(harvest_data))
+
+ # If we had metrics from a previous reporting period
+ # because we couldn't upload the metric data, we need
+ # to merge the data from the current reporting period
+ # with that for the previous period.
+
+ if retained.samples:
+ start = retained_start
+ retained.merge_samples(metrics)
+ metrics = retained
+
+ # Now attempt to upload the metric data.
+
+ retry = self.upload_report(start, end, metrics)
+
+ # If a failure occurred but failure type was such that we
+ # could try again to upload the data, then retain them. If
+ # have two many failed attempts though we give up.
+
+ if retry:
+ retries += 1
+
+ if retries == 5:
+ retries = 0
+
+ else:
+ retained = metrics
+
+ else:
+ retries = 0
+
+ if retries == 0:
+ retained_start = 0
+ retained.clear_samples()
+
+ else:
+ retained_start = start
+ retained = metrics
+
+ def generate_scoreboard(self, sample_start=None):
+ busy_workers = 0
+ idle_workers = 0
+ access_count = 0
+ bytes_served = 0
+
+ active_processes = 0
+
+ scoreboard = mod_wsgi.server_metrics()
+
+ if sample_start is None:
+ sample_start = scoreboard['current_time']
+
+ scoreboard['request_samples'] = request_samples = []
+
+ for process in scoreboard['processes']:
+ process['active_workers'] = 0
+
+ for worker in process['workers']:
+ status = worker['status']
+
+ if not process['quiescing'] and process['pid']:
+ if (status == SERVER_READY and process['generation'] ==
+ scoreboard['running_generation']):
+
+ process['active_workers'] += 1
+ idle_workers += 1
+
+ elif status not in (SERVER_DEAD, SERVER_STARTING,
+ SERVER_IDLE_KILL):
+
+ process['active_workers'] += 1
+ busy_workers += 1
+
+ count = worker['access_count']
+
+ if count or status not in (SERVER_READY, SERVER_DEAD):
+ access_count += count
+ bytes_served += worker['bytes_served']
+
+ current_time = scoreboard['current_time']
+
+ start_time = worker['start_time']
+ stop_time = worker['stop_time']
+
+ if (stop_time > start_time and sample_start < stop_time
+ and stop_time <= current_time):
+
+ duration = stop_time - start_time
+ thread_num = worker['thread_num']
+
+ request_samples.append(dict(start_time=start_time,
+ duration=duration, thread_num=thread_num))
+
+ if process['active_workers']:
+ active_processes += 1
+
+ scoreboard['busy_workers'] = busy_workers
+ scoreboard['idle_workers'] = idle_workers
+ scoreboard['access_count'] = access_count
+ scoreboard['bytes_served'] = bytes_served
+
+ scoreboard['active_processes'] = active_processes
+
+ return scoreboard
+
+ def record_process_statistics(self, scoreboard, harvest_data):
+ current_active_processes = scoreboard['active_processes']
+ previous_active_processes = harvest_data['active_processes']
+
+ harvest_data['active_processes'] = current_active_processes
+ harvest_data['processes_running'] += current_active_processes
+
+ if current_active_processes > previous_active_processes:
+ harvest_data['processes_started'] += (current_active_processes -
+ previous_active_processes)
+
+ elif current_active_processes < previous_active_processes:
+ harvest_data['processes_stopped'] += (previous_active_processes -
+ current_active_processes)
+
+ harvest_data['idle_workers'] += scoreboard['idle_workers']
+ harvest_data['busy_workers'] += scoreboard['busy_workers']
+
+ for process in scoreboard['processes']:
+ for worker in process['workers']:
+ harvest_data['worker_status'][worker['status']] += 1
+
+ def monitor_main_loop(self):
+ scoreboard = self.generate_scoreboard()
+
+ harvest_start = scoreboard['current_time']
+ sample_start = harvest_start
+ sample_duration = 0.0
+
+ access_count = scoreboard['access_count']
+ bytes_served = scoreboard['bytes_served']
+
+ harvest_data = {}
+
+ harvest_data['sample_count'] = 0
+ harvest_data['period_start'] = harvest_start
+
+ harvest_data['metrics'] = Samples()
+
+ harvest_data['request_samples'] = []
+
+ harvest_data['active_processes'] = 0
+
+ harvest_data['processes_running'] = 0
+ harvest_data['processes_started'] = 0
+ harvest_data['processes_stopped'] = 0
+
+ harvest_data['idle_workers'] = 0
+ harvest_data['busy_workers'] = 0
+
+ harvest_data['server_limit'] = scoreboard['server_limit']
+ harvest_data['thread_limit'] = scoreboard['thread_limit']
+
+ harvest_data['worker_status'] = {}
+
+ for status in STATUS_FLAGS.keys():
+ harvest_data['worker_status'][status] = 0
+
+ harvest_data['access_count'] = 0
+ harvest_data['bytes_served'] = 0
+
+ # Chart as 'Count'. Round to Integer.
+
+ harvest_data['metrics'].merge_value('Server/Restarts[|servers]', 0)
+
+ start = time.time()
+ end = start + 60.0
+
+ while True:
+ try:
+ # We want to collect metrics on a regular second
+ # interval so we need to align the timeout value.
+
+ now = time.time()
+ start += 1.0
+ timeout = start - now
+
+ return self.monitor_queue.get(timeout=timeout)
+
+ except queue.Empty:
+ pass
+
+ harvest_data['sample_count'] += 1
+
+ scoreboard = self.generate_scoreboard(sample_start)
+
+ harvest_end = scoreboard['current_time']
+ sample_end = harvest_end
+
+ sample_duration = sample_end - sample_start
+
+ self.record_process_statistics(scoreboard, harvest_data)
+
+ harvest_data['request_samples'].extend(
+ scoreboard['request_samples'])
+
+ access_count_delta = scoreboard['access_count']
+ access_count_delta -= access_count
+ access_count = scoreboard['access_count']
+
+ harvest_data['access_count'] += access_count_delta
+
+ bytes_served_delta = scoreboard['bytes_served']
+ bytes_served_delta -= bytes_served
+ bytes_served = scoreboard['bytes_served']
+
+ harvest_data['bytes_served'] += bytes_served_delta
+
+ now = time.time()
+
+ if now >= end:
+ harvest_data['period_end'] = harvest_end
+
+ self.report_queue.put(harvest_data)
+
+ harvest_start = harvest_end
+ end += 60.0
+
+ _harvest_data = {}
+
+ _harvest_data['sample_count'] = 0
+ _harvest_data['period_start'] = harvest_start
+
+ _harvest_data['metrics'] = Samples()
+
+ _harvest_data['request_samples'] = []
+
+ _harvest_data['active_processes'] = (
+ harvest_data['active_processes'])
+
+ _harvest_data['processes_running'] = 0
+ _harvest_data['processes_started'] = 0
+ _harvest_data['processes_stopped'] = 0
+
+ _harvest_data['idle_workers'] = 0
+ _harvest_data['busy_workers'] = 0
+
+ _harvest_data['server_limit'] = scoreboard['server_limit']
+ _harvest_data['thread_limit'] = scoreboard['thread_limit']
+
+ _harvest_data['worker_status'] = {}
+
+ for status in STATUS_FLAGS.keys():
+ _harvest_data['worker_status'][status] = 0
+
+ _harvest_data['access_count'] = 0
+ _harvest_data['bytes_served'] = 0
+
+ harvest_data = _harvest_data
+
+ sample_start = sample_end
+
+ def terminate(self):
+ try:
+ self.report_queue.put(None)
+ self.monitor_queue.put(None)
+ except Exception:
+ pass
+
+ self.monitor_thread.join()
+ self.report_thread.join()
+
+ def start(self):
+ if mod_wsgi.server_metrics() is None:
+ return
+
+ with self.lock:
+ if not self.running:
+ self.running = True
+ atexit.register(self.terminate)
+ self.monitor_thread.start()
+ self.report_thread.start()