diff options
author | James Socol <me@jamessocol.com> | 2018-08-22 08:36:00 -0400 |
---|---|---|
committer | James Socol <me@jamessocol.com> | 2018-08-22 08:49:11 -0400 |
commit | 4ef9faadb76abf28420662244a30203ccf3f9bea (patch) | |
tree | 295bdf021561c8d2d3a128f1b56f5cb338ef3609 | |
parent | 2780955f87bdab496685dcf743de86d2d6bf89e1 (diff) | |
download | pystatsd-4ef9faadb76abf28420662244a30203ccf3f9bea.tar.gz |
Refactor client module into package
The client module just broke 300 lines and had a lot of functionality in
it. This breaks the single file up into a package and reduces a decent
amount of duplication. Makes a few related changes:
- Replaces old Python __future__ imports with new ones
- Removes __all__ in favor of fewer imports
- Refactors common stream code into StreamClientBase
- Renames TCPPipeline to StreamPipeline for consistency (this is not a
public class name)
-rw-r--r-- | statsd/client.py | 304 | ||||
-rw-r--r-- | statsd/client/__init__.py | 4 | ||||
-rw-r--r-- | statsd/client/base.py | 103 | ||||
-rw-r--r-- | statsd/client/stream.py | 75 | ||||
-rw-r--r-- | statsd/client/timer.py | 71 | ||||
-rw-r--r-- | statsd/client/udp.py | 50 |
6 files changed, 303 insertions, 304 deletions
diff --git a/statsd/client.py b/statsd/client.py deleted file mode 100644 index cb54a00..0000000 --- a/statsd/client.py +++ /dev/null @@ -1,304 +0,0 @@ -from __future__ import with_statement -from collections import deque -from datetime import timedelta -import functools -import random -import socket - -# Use timer that's not susceptible to time of day adjustments. -try: - # perf_counter is only present on Py3.3+ - from time import perf_counter as time_now -except ImportError: - # fall back to using time - from time import time as time_now - - -__all__ = ['StatsClient', 'TCPStatsClient'] - - -def safe_wraps(wrapper, *args, **kwargs): - """Safely wraps partial functions.""" - while isinstance(wrapper, functools.partial): - wrapper = wrapper.func - return functools.wraps(wrapper, *args, **kwargs) - - -class Timer(object): - """A context manager/decorator for statsd.timing().""" - - def __init__(self, client, stat, rate=1): - self.client = client - self.stat = stat - self.rate = rate - self.ms = None - self._sent = False - self._start_time = None - - def __call__(self, f): - """Thread-safe timing function decorator.""" - @safe_wraps(f) - def _wrapped(*args, **kwargs): - start_time = time_now() - try: - return f(*args, **kwargs) - finally: - elapsed_time_ms = 1000.0 * (time_now() - start_time) - self.client.timing(self.stat, elapsed_time_ms, self.rate) - return _wrapped - - def __enter__(self): - return self.start() - - def __exit__(self, typ, value, tb): - self.stop() - - def start(self): - self.ms = None - self._sent = False - self._start_time = time_now() - return self - - def stop(self, send=True): - if self._start_time is None: - raise RuntimeError('Timer has not started.') - dt = time_now() - self._start_time - self.ms = 1000.0 * dt # Convert to milliseconds. - if send: - self.send() - return self - - def send(self): - if self.ms is None: - raise RuntimeError('No data recorded.') - if self._sent: - raise RuntimeError('Already sent data.') - self._sent = True - self.client.timing(self.stat, self.ms, self.rate) - - -class StatsClientBase(object): - """A Base class for various statsd clients.""" - - def _send(self): - raise NotImplementedError() - - def pipeline(self): - raise NotImplementedError() - - def timer(self, stat, rate=1): - return Timer(self, stat, rate) - - def timing(self, stat, delta, rate=1): - """ - Send new timing information. - - `delta` can be either a number of milliseconds or a timedelta. - """ - if isinstance(delta, timedelta): - # Convert timedelta to number of milliseconds. - delta = delta.total_seconds() * 1000. - self._send_stat(stat, '%0.6f|ms' % delta, rate) - - def incr(self, stat, count=1, rate=1): - """Increment a stat by `count`.""" - self._send_stat(stat, '%s|c' % count, rate) - - def decr(self, stat, count=1, rate=1): - """Decrement a stat by `count`.""" - self.incr(stat, -count, rate) - - def gauge(self, stat, value, rate=1, delta=False): - """Set a gauge value.""" - if value < 0 and not delta: - if rate < 1: - if random.random() > rate: - return - with self.pipeline() as pipe: - pipe._send_stat(stat, '0|g', 1) - pipe._send_stat(stat, '%s|g' % value, 1) - else: - prefix = '+' if delta and value >= 0 else '' - self._send_stat(stat, '%s%s|g' % (prefix, value), rate) - - def set(self, stat, value, rate=1): - """Set a set value.""" - self._send_stat(stat, '%s|s' % value, rate) - - def _send_stat(self, stat, value, rate): - self._after(self._prepare(stat, value, rate)) - - def _prepare(self, stat, value, rate): - if rate < 1: - if random.random() > rate: - return - value = '%s|@%s' % (value, rate) - - if self._prefix: - stat = '%s.%s' % (self._prefix, stat) - - return '%s:%s' % (stat, value) - - def _after(self, data): - if data: - self._send(data) - - -class StatsClient(StatsClientBase): - """A client for statsd.""" - - def __init__(self, host='localhost', port=8125, prefix=None, - maxudpsize=512, ipv6=False): - """Create a new client.""" - fam = socket.AF_INET6 if ipv6 else socket.AF_INET - family, _, _, _, addr = socket.getaddrinfo( - host, port, fam, socket.SOCK_DGRAM)[0] - self._addr = addr - self._sock = socket.socket(family, socket.SOCK_DGRAM) - self._prefix = prefix - self._maxudpsize = maxudpsize - - def _send(self, data): - """Send data to statsd.""" - try: - self._sock.sendto(data.encode('ascii'), self._addr) - except (socket.error, RuntimeError): - # No time for love, Dr. Jones! - pass - - def pipeline(self): - return Pipeline(self) - - -class TCPStatsClient(StatsClientBase): - """TCP version of StatsClient.""" - - def __init__(self, host='localhost', port=8125, prefix=None, - timeout=None, ipv6=False): - """Create a new client.""" - self._host = host - self._port = port - self._ipv6 = ipv6 - self._timeout = timeout - self._prefix = prefix - self._sock = None - - def _send(self, data): - """Send data to statsd.""" - if not self._sock: - self.connect() - self._do_send(data) - - def _do_send(self, data): - self._sock.sendall(data.encode('ascii') + b'\n') - - def close(self): - if self._sock and hasattr(self._sock, 'close'): - self._sock.close() - self._sock = None - - def connect(self): - fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET - family, _, _, _, addr = socket.getaddrinfo( - self._host, self._port, fam, socket.SOCK_STREAM)[0] - self._sock = socket.socket(family, socket.SOCK_STREAM) - self._sock.settimeout(self._timeout) - self._sock.connect(addr) - - def pipeline(self): - return TCPPipeline(self) - - def reconnect(self, data): - self.close() - self.connect() - - -class UnixSocketStatsClient(StatsClientBase): - """Unix domain socket version of StatsClient.""" - - def __init__(self, socket_path, prefix=None, timeout=None): - """Create a new client.""" - self._socket_path = socket_path - self._timeout = timeout - self._prefix = prefix - self._sock = None - - def connect(self): - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._sock.settimeout(self._timeout) - self._sock.connect(self._socket_path) - - def _send(self, data): - """Send data to statsd.""" - if not self._sock: - self.connect() - self._do_send(data) - - def _do_send(self, data): - self._sock.sendall(data.encode('ascii') + b'\n') - - def close(self): - self._sock.close() - self._sock = None - - def reconnect(self, data): - self.close() - self.connect() - - def pipeline(self): - return TCPPipeline(self) - - -class PipelineBase(StatsClientBase): - - def __init__(self, client): - self._client = client - self._prefix = client._prefix - self._stats = deque() - - def _send(self): - raise NotImplementedError() - - def _after(self, data): - if data is not None: - self._stats.append(data) - - def __enter__(self): - return self - - def __exit__(self, typ, value, tb): - self.send() - - def send(self): - if not self._stats: - return - self._send() - - def pipeline(self): - return self.__class__(self) - - -class Pipeline(PipelineBase): - - def __init__(self, client): - super(Pipeline, self).__init__(client) - self._maxudpsize = client._maxudpsize - - def _send(self): - data = self._stats.popleft() - while self._stats: - # Use popleft to preserve the order of the stats. - stat = self._stats.popleft() - if len(stat) + len(data) + 1 >= self._maxudpsize: - self._client._after(data) - data = stat - else: - data += '\n' + stat - self._client._after(data) - - -class TCPPipeline(PipelineBase): - - def _send(self): - self._client._after('\n'.join(self._stats)) - self._stats.clear() diff --git a/statsd/client/__init__.py b/statsd/client/__init__.py new file mode 100644 index 0000000..62cd202 --- /dev/null +++ b/statsd/client/__init__.py @@ -0,0 +1,4 @@ +from __future__ import absolute_import, division, unicode_literals + +from .stream import TCPStatsClient, UnixSocketStatsClient # noqa +from .udp import StatsClient # noqa diff --git a/statsd/client/base.py b/statsd/client/base.py new file mode 100644 index 0000000..08474c6 --- /dev/null +++ b/statsd/client/base.py @@ -0,0 +1,103 @@ +from __future__ import absolute_import, division, unicode_literals + +import random +from collections import deque +from datetime import timedelta + +from .timer import Timer + + +class StatsClientBase(object): + """A Base class for various statsd clients.""" + + def _send(self): + raise NotImplementedError() + + def pipeline(self): + raise NotImplementedError() + + def timer(self, stat, rate=1): + return Timer(self, stat, rate) + + def timing(self, stat, delta, rate=1): + """ + Send new timing information. + + `delta` can be either a number of milliseconds or a timedelta. + """ + if isinstance(delta, timedelta): + # Convert timedelta to number of milliseconds. + delta = delta.total_seconds() * 1000. + self._send_stat(stat, '%0.6f|ms' % delta, rate) + + def incr(self, stat, count=1, rate=1): + """Increment a stat by `count`.""" + self._send_stat(stat, '%s|c' % count, rate) + + def decr(self, stat, count=1, rate=1): + """Decrement a stat by `count`.""" + self.incr(stat, -count, rate) + + def gauge(self, stat, value, rate=1, delta=False): + """Set a gauge value.""" + if value < 0 and not delta: + if rate < 1: + if random.random() > rate: + return + with self.pipeline() as pipe: + pipe._send_stat(stat, '0|g', 1) + pipe._send_stat(stat, '%s|g' % value, 1) + else: + prefix = '+' if delta and value >= 0 else '' + self._send_stat(stat, '%s%s|g' % (prefix, value), rate) + + def set(self, stat, value, rate=1): + """Set a set value.""" + self._send_stat(stat, '%s|s' % value, rate) + + def _send_stat(self, stat, value, rate): + self._after(self._prepare(stat, value, rate)) + + def _prepare(self, stat, value, rate): + if rate < 1: + if random.random() > rate: + return + value = '%s|@%s' % (value, rate) + + if self._prefix: + stat = '%s.%s' % (self._prefix, stat) + + return '%s:%s' % (stat, value) + + def _after(self, data): + if data: + self._send(data) + + +class PipelineBase(StatsClientBase): + + def __init__(self, client): + self._client = client + self._prefix = client._prefix + self._stats = deque() + + def _send(self): + raise NotImplementedError() + + def _after(self, data): + if data is not None: + self._stats.append(data) + + def __enter__(self): + return self + + def __exit__(self, typ, value, tb): + self.send() + + def send(self): + if not self._stats: + return + self._send() + + def pipeline(self): + return self.__class__(self) diff --git a/statsd/client/stream.py b/statsd/client/stream.py new file mode 100644 index 0000000..76c216f --- /dev/null +++ b/statsd/client/stream.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class StreamPipeline(PipelineBase): + def _send(self): + self._client._after('\n'.join(self._stats)) + self._stats.clear() + + +class StreamClientBase(StatsClientBase): + def connect(self): + raise NotImplementedError() + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def reconnect(self): + self.close() + self.connect() + + def pipeline(self): + return StreamPipeline(self) + + def _send(self, data): + """Send data to statsd.""" + if not self._sock: + self.connect() + self._do_send(data) + + def _do_send(self, data): + self._sock.sendall(data.encode('ascii') + b'\n') + + +class TCPStatsClient(StreamClientBase): + """TCP version of StatsClient.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + timeout=None, ipv6=False): + """Create a new client.""" + self._host = host + self._port = port + self._ipv6 = ipv6 + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + self._host, self._port, fam, socket.SOCK_STREAM)[0] + self._sock = socket.socket(family, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(addr) + + +class UnixSocketStatsClient(StreamClientBase): + """Unix domain socket version of StatsClient.""" + + def __init__(self, socket_path, prefix=None, timeout=None): + """Create a new client.""" + self._socket_path = socket_path + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def connect(self): + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(self._socket_path) diff --git a/statsd/client/timer.py b/statsd/client/timer.py new file mode 100644 index 0000000..fefc9d0 --- /dev/null +++ b/statsd/client/timer.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import, division, unicode_literals + +import functools + +# Use timer that's not susceptible to time of day adjustments. +try: + # perf_counter is only present on Py3.3+ + from time import perf_counter as time_now +except ImportError: + # fall back to using time + from time import time as time_now + + +def safe_wraps(wrapper, *args, **kwargs): + """Safely wraps partial functions.""" + while isinstance(wrapper, functools.partial): + wrapper = wrapper.func + return functools.wraps(wrapper, *args, **kwargs) + + +class Timer(object): + """A context manager/decorator for statsd.timing().""" + + def __init__(self, client, stat, rate=1): + self.client = client + self.stat = stat + self.rate = rate + self.ms = None + self._sent = False + self._start_time = None + + def __call__(self, f): + """Thread-safe timing function decorator.""" + @safe_wraps(f) + def _wrapped(*args, **kwargs): + start_time = time_now() + try: + return f(*args, **kwargs) + finally: + elapsed_time_ms = 1000.0 * (time_now() - start_time) + self.client.timing(self.stat, elapsed_time_ms, self.rate) + return _wrapped + + def __enter__(self): + return self.start() + + def __exit__(self, typ, value, tb): + self.stop() + + def start(self): + self.ms = None + self._sent = False + self._start_time = time_now() + return self + + def stop(self, send=True): + if self._start_time is None: + raise RuntimeError('Timer has not started.') + dt = time_now() - self._start_time + self.ms = 1000.0 * dt # Convert to milliseconds. + if send: + self.send() + return self + + def send(self): + if self.ms is None: + raise RuntimeError('No data recorded.') + if self._sent: + raise RuntimeError('Already sent data.') + self._sent = True + self.client.timing(self.stat, self.ms, self.rate) diff --git a/statsd/client/udp.py b/statsd/client/udp.py new file mode 100644 index 0000000..790c5e8 --- /dev/null +++ b/statsd/client/udp.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import, division, unicode_literals + +import socket + +from .base import StatsClientBase, PipelineBase + + +class Pipeline(PipelineBase): + + def __init__(self, client): + super(Pipeline, self).__init__(client) + self._maxudpsize = client._maxudpsize + + def _send(self): + data = self._stats.popleft() + while self._stats: + # Use popleft to preserve the order of the stats. + stat = self._stats.popleft() + if len(stat) + len(data) + 1 >= self._maxudpsize: + self._client._after(data) + data = stat + else: + data += '\n' + stat + self._client._after(data) + + +class StatsClient(StatsClientBase): + """A client for statsd.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + maxudpsize=512, ipv6=False): + """Create a new client.""" + fam = socket.AF_INET6 if ipv6 else socket.AF_INET + family, _, _, _, addr = socket.getaddrinfo( + host, port, fam, socket.SOCK_DGRAM)[0] + self._addr = addr + self._sock = socket.socket(family, socket.SOCK_DGRAM) + self._prefix = prefix + self._maxudpsize = maxudpsize + + def _send(self, data): + """Send data to statsd.""" + try: + self._sock.sendto(data.encode('ascii'), self._addr) + except (socket.error, RuntimeError): + # No time for love, Dr. Jones! + pass + + def pipeline(self): + return Pipeline(self) |