summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Socol <me@jamessocol.com>2018-08-22 08:36:00 -0400
committerJames Socol <me@jamessocol.com>2018-08-22 08:49:11 -0400
commit4ef9faadb76abf28420662244a30203ccf3f9bea (patch)
tree295bdf021561c8d2d3a128f1b56f5cb338ef3609
parent2780955f87bdab496685dcf743de86d2d6bf89e1 (diff)
downloadpystatsd-4ef9faadb76abf28420662244a30203ccf3f9bea.tar.gz
Refactor client module into package
The client module just broke 300 lines and had a lot of functionality in it. This breaks the single file up into a package and reduces a decent amount of duplication. Makes a few related changes: - Replaces old Python __future__ imports with new ones - Removes __all__ in favor of fewer imports - Refactors common stream code into StreamClientBase - Renames TCPPipeline to StreamPipeline for consistency (this is not a public class name)
-rw-r--r--statsd/client.py304
-rw-r--r--statsd/client/__init__.py4
-rw-r--r--statsd/client/base.py103
-rw-r--r--statsd/client/stream.py75
-rw-r--r--statsd/client/timer.py71
-rw-r--r--statsd/client/udp.py50
6 files changed, 303 insertions, 304 deletions
diff --git a/statsd/client.py b/statsd/client.py
deleted file mode 100644
index cb54a00..0000000
--- a/statsd/client.py
+++ /dev/null
@@ -1,304 +0,0 @@
-from __future__ import with_statement
-from collections import deque
-from datetime import timedelta
-import functools
-import random
-import socket
-
-# Use timer that's not susceptible to time of day adjustments.
-try:
- # perf_counter is only present on Py3.3+
- from time import perf_counter as time_now
-except ImportError:
- # fall back to using time
- from time import time as time_now
-
-
-__all__ = ['StatsClient', 'TCPStatsClient']
-
-
-def safe_wraps(wrapper, *args, **kwargs):
- """Safely wraps partial functions."""
- while isinstance(wrapper, functools.partial):
- wrapper = wrapper.func
- return functools.wraps(wrapper, *args, **kwargs)
-
-
-class Timer(object):
- """A context manager/decorator for statsd.timing()."""
-
- def __init__(self, client, stat, rate=1):
- self.client = client
- self.stat = stat
- self.rate = rate
- self.ms = None
- self._sent = False
- self._start_time = None
-
- def __call__(self, f):
- """Thread-safe timing function decorator."""
- @safe_wraps(f)
- def _wrapped(*args, **kwargs):
- start_time = time_now()
- try:
- return f(*args, **kwargs)
- finally:
- elapsed_time_ms = 1000.0 * (time_now() - start_time)
- self.client.timing(self.stat, elapsed_time_ms, self.rate)
- return _wrapped
-
- def __enter__(self):
- return self.start()
-
- def __exit__(self, typ, value, tb):
- self.stop()
-
- def start(self):
- self.ms = None
- self._sent = False
- self._start_time = time_now()
- return self
-
- def stop(self, send=True):
- if self._start_time is None:
- raise RuntimeError('Timer has not started.')
- dt = time_now() - self._start_time
- self.ms = 1000.0 * dt # Convert to milliseconds.
- if send:
- self.send()
- return self
-
- def send(self):
- if self.ms is None:
- raise RuntimeError('No data recorded.')
- if self._sent:
- raise RuntimeError('Already sent data.')
- self._sent = True
- self.client.timing(self.stat, self.ms, self.rate)
-
-
-class StatsClientBase(object):
- """A Base class for various statsd clients."""
-
- def _send(self):
- raise NotImplementedError()
-
- def pipeline(self):
- raise NotImplementedError()
-
- def timer(self, stat, rate=1):
- return Timer(self, stat, rate)
-
- def timing(self, stat, delta, rate=1):
- """
- Send new timing information.
-
- `delta` can be either a number of milliseconds or a timedelta.
- """
- if isinstance(delta, timedelta):
- # Convert timedelta to number of milliseconds.
- delta = delta.total_seconds() * 1000.
- self._send_stat(stat, '%0.6f|ms' % delta, rate)
-
- def incr(self, stat, count=1, rate=1):
- """Increment a stat by `count`."""
- self._send_stat(stat, '%s|c' % count, rate)
-
- def decr(self, stat, count=1, rate=1):
- """Decrement a stat by `count`."""
- self.incr(stat, -count, rate)
-
- def gauge(self, stat, value, rate=1, delta=False):
- """Set a gauge value."""
- if value < 0 and not delta:
- if rate < 1:
- if random.random() > rate:
- return
- with self.pipeline() as pipe:
- pipe._send_stat(stat, '0|g', 1)
- pipe._send_stat(stat, '%s|g' % value, 1)
- else:
- prefix = '+' if delta and value >= 0 else ''
- self._send_stat(stat, '%s%s|g' % (prefix, value), rate)
-
- def set(self, stat, value, rate=1):
- """Set a set value."""
- self._send_stat(stat, '%s|s' % value, rate)
-
- def _send_stat(self, stat, value, rate):
- self._after(self._prepare(stat, value, rate))
-
- def _prepare(self, stat, value, rate):
- if rate < 1:
- if random.random() > rate:
- return
- value = '%s|@%s' % (value, rate)
-
- if self._prefix:
- stat = '%s.%s' % (self._prefix, stat)
-
- return '%s:%s' % (stat, value)
-
- def _after(self, data):
- if data:
- self._send(data)
-
-
-class StatsClient(StatsClientBase):
- """A client for statsd."""
-
- def __init__(self, host='localhost', port=8125, prefix=None,
- maxudpsize=512, ipv6=False):
- """Create a new client."""
- fam = socket.AF_INET6 if ipv6 else socket.AF_INET
- family, _, _, _, addr = socket.getaddrinfo(
- host, port, fam, socket.SOCK_DGRAM)[0]
- self._addr = addr
- self._sock = socket.socket(family, socket.SOCK_DGRAM)
- self._prefix = prefix
- self._maxudpsize = maxudpsize
-
- def _send(self, data):
- """Send data to statsd."""
- try:
- self._sock.sendto(data.encode('ascii'), self._addr)
- except (socket.error, RuntimeError):
- # No time for love, Dr. Jones!
- pass
-
- def pipeline(self):
- return Pipeline(self)
-
-
-class TCPStatsClient(StatsClientBase):
- """TCP version of StatsClient."""
-
- def __init__(self, host='localhost', port=8125, prefix=None,
- timeout=None, ipv6=False):
- """Create a new client."""
- self._host = host
- self._port = port
- self._ipv6 = ipv6
- self._timeout = timeout
- self._prefix = prefix
- self._sock = None
-
- def _send(self, data):
- """Send data to statsd."""
- if not self._sock:
- self.connect()
- self._do_send(data)
-
- def _do_send(self, data):
- self._sock.sendall(data.encode('ascii') + b'\n')
-
- def close(self):
- if self._sock and hasattr(self._sock, 'close'):
- self._sock.close()
- self._sock = None
-
- def connect(self):
- fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
- family, _, _, _, addr = socket.getaddrinfo(
- self._host, self._port, fam, socket.SOCK_STREAM)[0]
- self._sock = socket.socket(family, socket.SOCK_STREAM)
- self._sock.settimeout(self._timeout)
- self._sock.connect(addr)
-
- def pipeline(self):
- return TCPPipeline(self)
-
- def reconnect(self, data):
- self.close()
- self.connect()
-
-
-class UnixSocketStatsClient(StatsClientBase):
- """Unix domain socket version of StatsClient."""
-
- def __init__(self, socket_path, prefix=None, timeout=None):
- """Create a new client."""
- self._socket_path = socket_path
- self._timeout = timeout
- self._prefix = prefix
- self._sock = None
-
- def connect(self):
- self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self._sock.settimeout(self._timeout)
- self._sock.connect(self._socket_path)
-
- def _send(self, data):
- """Send data to statsd."""
- if not self._sock:
- self.connect()
- self._do_send(data)
-
- def _do_send(self, data):
- self._sock.sendall(data.encode('ascii') + b'\n')
-
- def close(self):
- self._sock.close()
- self._sock = None
-
- def reconnect(self, data):
- self.close()
- self.connect()
-
- def pipeline(self):
- return TCPPipeline(self)
-
-
-class PipelineBase(StatsClientBase):
-
- def __init__(self, client):
- self._client = client
- self._prefix = client._prefix
- self._stats = deque()
-
- def _send(self):
- raise NotImplementedError()
-
- def _after(self, data):
- if data is not None:
- self._stats.append(data)
-
- def __enter__(self):
- return self
-
- def __exit__(self, typ, value, tb):
- self.send()
-
- def send(self):
- if not self._stats:
- return
- self._send()
-
- def pipeline(self):
- return self.__class__(self)
-
-
-class Pipeline(PipelineBase):
-
- def __init__(self, client):
- super(Pipeline, self).__init__(client)
- self._maxudpsize = client._maxudpsize
-
- def _send(self):
- data = self._stats.popleft()
- while self._stats:
- # Use popleft to preserve the order of the stats.
- stat = self._stats.popleft()
- if len(stat) + len(data) + 1 >= self._maxudpsize:
- self._client._after(data)
- data = stat
- else:
- data += '\n' + stat
- self._client._after(data)
-
-
-class TCPPipeline(PipelineBase):
-
- def _send(self):
- self._client._after('\n'.join(self._stats))
- self._stats.clear()
diff --git a/statsd/client/__init__.py b/statsd/client/__init__.py
new file mode 100644
index 0000000..62cd202
--- /dev/null
+++ b/statsd/client/__init__.py
@@ -0,0 +1,4 @@
+from __future__ import absolute_import, division, unicode_literals
+
+from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
+from .udp import StatsClient # noqa
diff --git a/statsd/client/base.py b/statsd/client/base.py
new file mode 100644
index 0000000..08474c6
--- /dev/null
+++ b/statsd/client/base.py
@@ -0,0 +1,103 @@
+from __future__ import absolute_import, division, unicode_literals
+
+import random
+from collections import deque
+from datetime import timedelta
+
+from .timer import Timer
+
+
+class StatsClientBase(object):
+ """A Base class for various statsd clients."""
+
+ def _send(self):
+ raise NotImplementedError()
+
+ def pipeline(self):
+ raise NotImplementedError()
+
+ def timer(self, stat, rate=1):
+ return Timer(self, stat, rate)
+
+ def timing(self, stat, delta, rate=1):
+ """
+ Send new timing information.
+
+ `delta` can be either a number of milliseconds or a timedelta.
+ """
+ if isinstance(delta, timedelta):
+ # Convert timedelta to number of milliseconds.
+ delta = delta.total_seconds() * 1000.
+ self._send_stat(stat, '%0.6f|ms' % delta, rate)
+
+ def incr(self, stat, count=1, rate=1):
+ """Increment a stat by `count`."""
+ self._send_stat(stat, '%s|c' % count, rate)
+
+ def decr(self, stat, count=1, rate=1):
+ """Decrement a stat by `count`."""
+ self.incr(stat, -count, rate)
+
+ def gauge(self, stat, value, rate=1, delta=False):
+ """Set a gauge value."""
+ if value < 0 and not delta:
+ if rate < 1:
+ if random.random() > rate:
+ return
+ with self.pipeline() as pipe:
+ pipe._send_stat(stat, '0|g', 1)
+ pipe._send_stat(stat, '%s|g' % value, 1)
+ else:
+ prefix = '+' if delta and value >= 0 else ''
+ self._send_stat(stat, '%s%s|g' % (prefix, value), rate)
+
+ def set(self, stat, value, rate=1):
+ """Set a set value."""
+ self._send_stat(stat, '%s|s' % value, rate)
+
+ def _send_stat(self, stat, value, rate):
+ self._after(self._prepare(stat, value, rate))
+
+ def _prepare(self, stat, value, rate):
+ if rate < 1:
+ if random.random() > rate:
+ return
+ value = '%s|@%s' % (value, rate)
+
+ if self._prefix:
+ stat = '%s.%s' % (self._prefix, stat)
+
+ return '%s:%s' % (stat, value)
+
+ def _after(self, data):
+ if data:
+ self._send(data)
+
+
+class PipelineBase(StatsClientBase):
+
+ def __init__(self, client):
+ self._client = client
+ self._prefix = client._prefix
+ self._stats = deque()
+
+ def _send(self):
+ raise NotImplementedError()
+
+ def _after(self, data):
+ if data is not None:
+ self._stats.append(data)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, typ, value, tb):
+ self.send()
+
+ def send(self):
+ if not self._stats:
+ return
+ self._send()
+
+ def pipeline(self):
+ return self.__class__(self)
diff --git a/statsd/client/stream.py b/statsd/client/stream.py
new file mode 100644
index 0000000..76c216f
--- /dev/null
+++ b/statsd/client/stream.py
@@ -0,0 +1,75 @@
+from __future__ import absolute_import, division, unicode_literals
+
+import socket
+
+from .base import StatsClientBase, PipelineBase
+
+
+class StreamPipeline(PipelineBase):
+ def _send(self):
+ self._client._after('\n'.join(self._stats))
+ self._stats.clear()
+
+
+class StreamClientBase(StatsClientBase):
+ def connect(self):
+ raise NotImplementedError()
+
+ def close(self):
+ if self._sock and hasattr(self._sock, 'close'):
+ self._sock.close()
+ self._sock = None
+
+ def reconnect(self):
+ self.close()
+ self.connect()
+
+ def pipeline(self):
+ return StreamPipeline(self)
+
+ def _send(self, data):
+ """Send data to statsd."""
+ if not self._sock:
+ self.connect()
+ self._do_send(data)
+
+ def _do_send(self, data):
+ self._sock.sendall(data.encode('ascii') + b'\n')
+
+
+class TCPStatsClient(StreamClientBase):
+ """TCP version of StatsClient."""
+
+ def __init__(self, host='localhost', port=8125, prefix=None,
+ timeout=None, ipv6=False):
+ """Create a new client."""
+ self._host = host
+ self._port = port
+ self._ipv6 = ipv6
+ self._timeout = timeout
+ self._prefix = prefix
+ self._sock = None
+
+ def connect(self):
+ fam = socket.AF_INET6 if self._ipv6 else socket.AF_INET
+ family, _, _, _, addr = socket.getaddrinfo(
+ self._host, self._port, fam, socket.SOCK_STREAM)[0]
+ self._sock = socket.socket(family, socket.SOCK_STREAM)
+ self._sock.settimeout(self._timeout)
+ self._sock.connect(addr)
+
+
+class UnixSocketStatsClient(StreamClientBase):
+ """Unix domain socket version of StatsClient."""
+
+ def __init__(self, socket_path, prefix=None, timeout=None):
+ """Create a new client."""
+ self._socket_path = socket_path
+ self._timeout = timeout
+ self._prefix = prefix
+ self._sock = None
+
+ def connect(self):
+ self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self._sock.settimeout(self._timeout)
+ self._sock.connect(self._socket_path)
diff --git a/statsd/client/timer.py b/statsd/client/timer.py
new file mode 100644
index 0000000..fefc9d0
--- /dev/null
+++ b/statsd/client/timer.py
@@ -0,0 +1,71 @@
+from __future__ import absolute_import, division, unicode_literals
+
+import functools
+
+# Use timer that's not susceptible to time of day adjustments.
+try:
+ # perf_counter is only present on Py3.3+
+ from time import perf_counter as time_now
+except ImportError:
+ # fall back to using time
+ from time import time as time_now
+
+
+def safe_wraps(wrapper, *args, **kwargs):
+ """Safely wraps partial functions."""
+ while isinstance(wrapper, functools.partial):
+ wrapper = wrapper.func
+ return functools.wraps(wrapper, *args, **kwargs)
+
+
+class Timer(object):
+ """A context manager/decorator for statsd.timing()."""
+
+ def __init__(self, client, stat, rate=1):
+ self.client = client
+ self.stat = stat
+ self.rate = rate
+ self.ms = None
+ self._sent = False
+ self._start_time = None
+
+ def __call__(self, f):
+ """Thread-safe timing function decorator."""
+ @safe_wraps(f)
+ def _wrapped(*args, **kwargs):
+ start_time = time_now()
+ try:
+ return f(*args, **kwargs)
+ finally:
+ elapsed_time_ms = 1000.0 * (time_now() - start_time)
+ self.client.timing(self.stat, elapsed_time_ms, self.rate)
+ return _wrapped
+
+ def __enter__(self):
+ return self.start()
+
+ def __exit__(self, typ, value, tb):
+ self.stop()
+
+ def start(self):
+ self.ms = None
+ self._sent = False
+ self._start_time = time_now()
+ return self
+
+ def stop(self, send=True):
+ if self._start_time is None:
+ raise RuntimeError('Timer has not started.')
+ dt = time_now() - self._start_time
+ self.ms = 1000.0 * dt # Convert to milliseconds.
+ if send:
+ self.send()
+ return self
+
+ def send(self):
+ if self.ms is None:
+ raise RuntimeError('No data recorded.')
+ if self._sent:
+ raise RuntimeError('Already sent data.')
+ self._sent = True
+ self.client.timing(self.stat, self.ms, self.rate)
diff --git a/statsd/client/udp.py b/statsd/client/udp.py
new file mode 100644
index 0000000..790c5e8
--- /dev/null
+++ b/statsd/client/udp.py
@@ -0,0 +1,50 @@
+from __future__ import absolute_import, division, unicode_literals
+
+import socket
+
+from .base import StatsClientBase, PipelineBase
+
+
+class Pipeline(PipelineBase):
+
+ def __init__(self, client):
+ super(Pipeline, self).__init__(client)
+ self._maxudpsize = client._maxudpsize
+
+ def _send(self):
+ data = self._stats.popleft()
+ while self._stats:
+ # Use popleft to preserve the order of the stats.
+ stat = self._stats.popleft()
+ if len(stat) + len(data) + 1 >= self._maxudpsize:
+ self._client._after(data)
+ data = stat
+ else:
+ data += '\n' + stat
+ self._client._after(data)
+
+
+class StatsClient(StatsClientBase):
+ """A client for statsd."""
+
+ def __init__(self, host='localhost', port=8125, prefix=None,
+ maxudpsize=512, ipv6=False):
+ """Create a new client."""
+ fam = socket.AF_INET6 if ipv6 else socket.AF_INET
+ family, _, _, _, addr = socket.getaddrinfo(
+ host, port, fam, socket.SOCK_DGRAM)[0]
+ self._addr = addr
+ self._sock = socket.socket(family, socket.SOCK_DGRAM)
+ self._prefix = prefix
+ self._maxudpsize = maxudpsize
+
+ def _send(self, data):
+ """Send data to statsd."""
+ try:
+ self._sock.sendto(data.encode('ascii'), self._addr)
+ except (socket.error, RuntimeError):
+ # No time for love, Dr. Jones!
+ pass
+
+ def pipeline(self):
+ return Pipeline(self)