summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Socol <me@jamessocol.com>2015-03-18 08:28:35 -0400
committerJames Socol <me@jamessocol.com>2015-03-18 08:28:35 -0400
commit6e79b5d79adb82b6bc5b44598e45a712e32a2629 (patch)
treeaba33c26a05cb4ca6b57a089a9c495a4ba93aef4
parentfdd2e81763fa5d127fdbf5565cbc73aff3273694 (diff)
parent9b007bd8b089bb03faec5605e841bad781920e91 (diff)
downloadpystatsd-6e79b5d79adb82b6bc5b44598e45a712e32a2629.tar.gz
Merge pull request #59 from NetAccessCorp/refactor_into_baseclass
Adding TCP connection support
-rw-r--r--docs/index.rst1
-rw-r--r--docs/reference.rst87
-rw-r--r--docs/tcp.rst19
-rw-r--r--statsd/__init__.py3
-rw-r--r--statsd/client.py115
-rw-r--r--statsd/tests.py849
6 files changed, 849 insertions, 225 deletions
diff --git a/docs/index.rst b/docs/index.rst
index b56bb64..ed4cb6c 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -70,6 +70,7 @@ Contents
types.rst
timing.rst
pipeline.rst
+ tcp.rst
reference.rst
contributing.rst
diff --git a/docs/reference.rst b/docs/reference.rst
index 0c0202e..e4527a7 100644
--- a/docs/reference.rst
+++ b/docs/reference.rst
@@ -275,6 +275,93 @@ stats.
This method is not implemented on the base StatsClient class.
+.. _TCPStatsClient:
+
+``TCPStatsClient``
+==================
+
+::
+
+ TCPStatsClient(host='localhost', port=8125, prefix=None, timeout=None)
+
+Create a new ``TCPStatsClient`` instance with the appropriate connection
+and prefix information.
+
+* ``host``: the hostname or IPv4 address of the statsd_ server.
+
+* ``port``: the port of the statsd server.
+
+* ``prefix``: a prefix to distinguish and group stats from an
+ application or environment.
+
+* ``timeout``: socket timeout for any actions on the connection socket.
+
+
+``TCPStatsClient`` implements all methods of ``StatsClient``, with the
+difference that it is not thread safe and it can raise exceptions on
+connection errors. On the contrary to ``StatsClient`` it uses a ``TCP``
+connection to connect to Statsd.
+Additionally to the methods of ``StatsClient`` it has a few which are
+specific to ``TCP`` connections.
+
+
+.. _tcp_close:
+
+``close``
+---------
+
+::
+
+ from statsd import TCPStatsClient
+
+ statsd = TCPStatsClient()
+ statsd.incr('some.event')
+ statsd.close()
+
+Closes a connection that's currently open and deletes it's socket. If this is
+called on a ``TCPStatsClient`` which currently has no open connection it is a
+non-action.
+
+
+.. _tcp_connect:
+
+``connect``
+-----------
+
+::
+
+ from statsd import TCPStatsClient
+
+ statsd = TCPStatsClient()
+ statsd.incr('some.event') # calls connect() internally
+ statsd.close()
+ statsd.connect() # creates new connection
+
+Creates a connection to Statsd. If there are errors like connection timed out
+or connection refused, the according exceptions will be raised. It is usually
+not necessary to call this method because sending data to Statsd will call
+``connect`` implicitely if the current instance of ``TCPStatsClient`` does not
+already hold an open connection.
+
+
+.. _tcp_reconnect:
+
+``reconnect``
+-------------
+
+::
+
+ from statsd import TCPStatsClient
+
+ statsd = TCPStatsClient()
+ statsd.incr('some.event')
+ statsd.reconnect() # closes open connection and creates new one
+
+Closes a currently existing connection and replaces it with a new one. If no
+connection exists already it will simply create a new one. Internally this
+does nothing else than calling ``close()`` and ``connect()``.
+
+
.. _statsd: https://github.com/etsy/statsd
.. _0ed78be: https://github.com/etsy/statsd/commit/0ed78be7
.. _1c10cfc0ac: https://github.com/etsy/statsd/commit/1c10cfc0ac
diff --git a/docs/tcp.rst b/docs/tcp.rst
new file mode 100644
index 0000000..3c81af6
--- /dev/null
+++ b/docs/tcp.rst
@@ -0,0 +1,19 @@
+.. _tcp-chapter:
+
+==============
+TCPStatsClient
+==============
+
+The ``TCPStatsClient`` class has a very similar interface to ``StatsClient``,
+but internally it uses ``TCP`` connections instead of ``UDP``. These are the
+main differencies when using ``TCPStatsClient`` compared to ``StatsClient``:
+
+* The constructor supports a ``timeout`` parameter to set a timeout on all
+ socket actions.
+
+* The methods ``connect`` and all methods that send data can potentially raise
+ socket exceptions.
+
+* It is not thread-safe, so it is recommended to not share it across threads
+ unless a lot of attention is paid to make sure that no two threads ever use
+ it at once.
diff --git a/statsd/__init__.py b/statsd/__init__.py
index a4f0372..79f54f5 100644
--- a/statsd/__init__.py
+++ b/statsd/__init__.py
@@ -1,8 +1,9 @@
from __future__ import absolute_import
from .client import StatsClient
+from .client import TCPStatsClient
VERSION = (3, 0, 1)
__version__ = '.'.join(map(str, VERSION))
-__all__ = ['StatsClient']
+__all__ = ['StatsClient', 'TCPStatsClient']
diff --git a/statsd/client.py b/statsd/client.py
index 8bca7f2..a423dae 100644
--- a/statsd/client.py
+++ b/statsd/client.py
@@ -4,9 +4,10 @@ from functools import wraps
import random
import socket
import time
+import abc
-__all__ = ['StatsClient']
+__all__ = ['StatsClient', 'TCPStatsClient']
class Timer(object):
@@ -63,22 +64,18 @@ class Timer(object):
self.client.timing(self.stat, self.ms, self.rate)
-class StatsClient(object):
- """A client for statsd."""
+class StatsClientBase(object):
+ """A Base class for various statsd clients."""
- def __init__(self, host='localhost', port=8125, prefix=None,
- maxudpsize=512):
- """Create a new client."""
- family, _, _, _, addr = socket.getaddrinfo(
- host, port, 0, socket.SOCK_DGRAM
- )[0]
- self._addr = addr
- self._sock = socket.socket(family, socket.SOCK_DGRAM)
- self._prefix = prefix
- self._maxudpsize = maxudpsize
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def _send(self):
+ pass
+ @abc.abstractmethod
def pipeline(self):
- return Pipeline(self)
+ pass
def timer(self, stat, rate=1):
return Timer(self, stat, rate)
@@ -130,6 +127,21 @@ class StatsClient(object):
if data:
self._send(data)
+
+class StatsClient(StatsClientBase):
+ """A client for statsd."""
+
+ def __init__(self, host='localhost', port=8125, prefix=None,
+ maxudpsize=512):
+ """Create a new client."""
+ family, _, _, _, addr = socket.getaddrinfo(
+ host, port, 0, socket.SOCK_DGRAM
+ )[0]
+ self._addr = addr
+ self._sock = socket.socket(family, socket.SOCK_DGRAM)
+ self._prefix = prefix
+ self._maxudpsize = maxudpsize
+
def _send(self, data):
"""Send data to statsd."""
try:
@@ -138,14 +150,63 @@ class StatsClient(object):
# No time for love, Dr. Jones!
pass
+ def pipeline(self):
+ return Pipeline(self)
+
+
+class TCPStatsClient(StatsClientBase):
+ """TCP version of StatsClient."""
+
+ def __init__(self, host='localhost', port=8125, prefix=None, timeout=None):
+ """Create a new client."""
+ self._host = host
+ self._port = port
+ self._timeout = timeout
+ self._prefix = prefix
+ self._sock = None
+
+ def _send(self, data):
+ """Send data to statsd."""
+ if not self._sock:
+ self.connect()
+ self._do_send(data)
+
+ def _do_send(self, data):
+ self._sock.sendall(data.encode('ascii') + b'\n')
+
+ def close(self):
+ if self._sock and hasattr(self._sock, 'close'):
+ self._sock.close()
+ self._sock = None
+
+ def connect(self):
+ family, _, _, _, addr = socket.getaddrinfo(
+ self._host, self._port, 0, socket.SOCK_STREAM)[0]
+ self._sock = socket.socket(family, socket.SOCK_STREAM)
+ self._sock.settimeout(self._timeout)
+ self._sock.connect(addr)
+
+ def pipeline(self):
+ return TCPPipeline(self)
+
+ def reconnect(self, data):
+ self.close()
+ self.connect()
+
+
+class PipelineBase(StatsClientBase):
+
+ __metaclass__ = abc.ABCMeta
-class Pipeline(StatsClient):
def __init__(self, client):
self._client = client
self._prefix = client._prefix
- self._maxudpsize = client._maxudpsize
self._stats = deque()
+ @abc.abstractmethod
+ def _send(self):
+ pass
+
def _after(self, data):
if data is not None:
self._stats.append(data)
@@ -157,11 +218,24 @@ class Pipeline(StatsClient):
self.send()
def send(self):
- # Use popleft to preserve the order of the stats.
if not self._stats:
return
+ self._send()
+
+ def pipeline(self):
+ return self.__class__(self)
+
+
+class Pipeline(PipelineBase):
+
+ def __init__(self, client):
+ super(Pipeline, self).__init__(client)
+ self._maxudpsize = client._maxudpsize
+
+ def _send(self):
data = self._stats.popleft()
while self._stats:
+ # Use popleft to preserve the order of the stats.
stat = self._stats.popleft()
if len(stat) + len(data) + 1 >= self._maxudpsize:
self._client._after(data)
@@ -169,3 +243,10 @@ class Pipeline(StatsClient):
else:
data += '\n' + stat
self._client._after(data)
+
+
+class TCPPipeline(PipelineBase):
+
+ def _send(self):
+ self._client._after('\n'.join(self._stats))
+ self._stats.clear()
diff --git a/statsd/tests.py b/statsd/tests.py
index 41f487c..a6a525c 100644
--- a/statsd/tests.py
+++ b/statsd/tests.py
@@ -7,22 +7,64 @@ import mock
from nose.tools import eq_
from statsd import StatsClient
+from statsd import TCPStatsClient
ADDR = (socket.gethostbyname('localhost'), 8125)
-def _client(prefix=None):
- sc = StatsClient(host=ADDR[0], port=ADDR[1], prefix=prefix)
+# proto specific methods to get the socket method to send data
+send_method = {
+ 'udp': lambda x: x.sendto,
+ 'tcp': lambda x: x.sendall,
+}
+
+
+# proto specific methods to create the expected value
+make_val = {
+ 'udp': lambda x, addr: mock.call(str.encode(x), addr),
+ 'tcp': lambda x, addr: mock.call(str.encode(x + '\n')),
+}
+
+
+def _udp_client(prefix=None, addr=None, port=None):
+ if not addr:
+ addr = ADDR[0]
+ if not port:
+ port = ADDR[1]
+ sc = StatsClient(host=addr, port=port, prefix=prefix)
+ sc._sock = mock.Mock()
+ return sc
+
+
+def _tcp_client(prefix=None, addr=None, port=None, timeout=None):
+ if not addr:
+ addr = ADDR[0]
+ if not port:
+ port = ADDR[1]
+ sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout)
sc._sock = mock.Mock()
return sc
-def _sock_check(cl, count, val=None):
- eq_(cl._sock.sendto.call_count, count)
+def _timer_check(sock, count, proto, start, end):
+ send = send_method[proto](sock)
+ eq_(send.call_count, count)
+ value = send.call_args[0][0].decode('ascii')
+ exp = re.compile('^%s:\d+|%s$' % (start, end))
+ assert exp.match(value)
+
+
+def _sock_check(sock, count, proto, val=None, addr=None):
+ send = send_method[proto](sock)
+ eq_(send.call_count, count)
+ if not addr:
+ addr = ADDR
if val is not None:
- val = val.encode('ascii')
- eq_(cl._sock.sendto.call_args, ((val, ADDR), {}))
+ eq_(
+ send.call_args,
+ make_val[proto](val, addr),
+ )
class assert_raises(object):
@@ -82,54 +124,108 @@ class assert_raises(object):
return True
+def _test_incr(cl, proto):
+ cl.incr('foo')
+ _sock_check(cl._sock, 1, proto, val='foo:1|c')
+
+ cl.incr('foo', 10)
+ _sock_check(cl._sock, 2, proto, val='foo:10|c')
+
+ cl.incr('foo', 1.2)
+ _sock_check(cl._sock, 3, proto, val='foo:1.2|c')
+
+ cl.incr('foo', 10, rate=0.5)
+ _sock_check(cl._sock, 4, proto, val='foo:10|c|@0.5')
+
+
@mock.patch.object(random, 'random', lambda: -1)
-def test_incr():
- sc = _client()
+def test_incr_udp():
+ """StatsClient.incr works."""
+ cl = _udp_client()
+ _test_incr(cl, 'udp')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_incr_tcp():
+ """TCPStatsClient.incr works."""
+ cl = _tcp_client()
+ _test_incr(cl, 'tcp')
+
+
+def _test_decr(cl, proto):
+ cl.decr('foo')
+ _sock_check(cl._sock, 1, proto, 'foo:-1|c')
+
+ cl.decr('foo', 10)
+ _sock_check(cl._sock, 2, proto, 'foo:-10|c')
- sc.incr('foo')
- _sock_check(sc, 1, 'foo:1|c')
+ cl.decr('foo', 1.2)
+ _sock_check(cl._sock, 3, proto, 'foo:-1.2|c')
- sc.incr('foo', 10)
- _sock_check(sc, 2, 'foo:10|c')
+ cl.decr('foo', 1, rate=0.5)
+ _sock_check(cl._sock, 4, proto, 'foo:-1|c|@0.5')
- sc.incr('foo', 1.2)
- _sock_check(sc, 3, 'foo:1.2|c')
- sc.incr('foo', 10, rate=0.5)
- _sock_check(sc, 4, 'foo:10|c|@0.5')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_decr_udp():
+ """StatsClient.decr works."""
+ cl = _udp_client()
+ _test_decr(cl, 'udp')
@mock.patch.object(random, 'random', lambda: -1)
-def test_decr():
- sc = _client()
+def test_decr_tcp():
+ """TCPStatsClient.decr works."""
+ cl = _tcp_client()
+ _test_decr(cl, 'tcp')
+
+
+def _test_gauge(cl, proto):
+ cl.gauge('foo', 30)
+ _sock_check(cl._sock, 1, proto, 'foo:30|g')
- sc.decr('foo')
- _sock_check(sc, 1, 'foo:-1|c')
+ cl.gauge('foo', 1.2)
+ _sock_check(cl._sock, 2, proto, 'foo:1.2|g')
- sc.decr('foo', 10)
- _sock_check(sc, 2, 'foo:-10|c')
+ cl.gauge('foo', 70, rate=0.5)
+ _sock_check(cl._sock, 3, proto, 'foo:70|g|@0.5')
- sc.decr('foo', 1.2)
- _sock_check(sc, 3, 'foo:-1.2|c')
- sc.decr('foo', 1, rate=0.5)
- _sock_check(sc, 4, 'foo:-1|c|@0.5')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_udp():
+ """StatsClient.gauge works."""
+ cl = _udp_client()
+ _test_gauge(cl, 'udp')
@mock.patch.object(random, 'random', lambda: -1)
-def test_gauge():
- sc = _client()
- sc.gauge('foo', 30)
- _sock_check(sc, 1, 'foo:30|g')
+def test_gauge_tcp():
+ """TCPStatsClient.gauge works."""
+ cl = _tcp_client()
+ _test_gauge(cl, 'tcp')
+
+
+def _test_ipv6(cl, proto, addr):
+ cl.gauge('foo', 30)
+ _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr)
- sc.gauge('foo', 1.2)
- _sock_check(sc, 2, 'foo:1.2|g')
- sc.gauge('foo', 70, rate=0.5)
- _sock_check(sc, 3, 'foo:70|g|@0.5')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_ipv6_udp():
+ """StatsClient can use to IPv6 address."""
+ addr = ('::1', 8125, 0, 0)
+ cl = _udp_client(addr=addr[0])
+ _test_ipv6(cl, 'udp', addr)
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_ipv6_tcp():
+ """TCPStatsClient can use to IPv6 address."""
+ addr = ('::1', 8125, 0, 0)
+ cl = _tcp_client(addr=addr[0])
+ _test_ipv6(cl, 'tcp', addr)
-def test_gauge_delta():
+def _test_gauge_delta(cl, proto):
tests = (
(12, '+12'),
(-13, '-13'),
@@ -138,65 +234,126 @@ def test_gauge_delta():
)
def _check(num, result):
- sc = _client()
- sc.gauge('foo', num, delta=True)
- _sock_check(sc, 1, 'foo:%s|g' % result)
+ cl._sock.reset_mock()
+ cl.gauge('foo', num, delta=True)
+ _sock_check(cl._sock, 1, proto, 'foo:%s|g' % result)
for num, result in tests:
- yield _check, num, result
+ _check(num, result)
-def test_gauge_absolute_negative():
- sc = _client()
- sc.gauge('foo', -5, delta=False)
- _sock_check(sc, 1, 'foo:0|g\nfoo:-5|g')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_delta_udp():
+ """StatsClient.gauge works with delta values."""
+ cl = _udp_client()
+ _test_gauge_delta(cl, 'udp')
-@mock.patch.object(random, 'random')
-def test_gauge_absolute_negative_rate(mock_random):
- sc = _client()
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_delta_tcp():
+ """TCPStatsClient.gauge works with delta values."""
+ cl = _tcp_client()
+ _test_gauge_delta(cl, 'tcp')
+
+
+def _test_gauge_absolute_negative(cl, proto):
+ cl.gauge('foo', -5, delta=False)
+ _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_absolute_negative_udp():
+ """StatsClient.gauge works with absolute negative value."""
+ cl = _udp_client()
+ _test_gauge_delta(cl, 'udp')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_absolute_negative_tcp():
+ """TCPStatsClient.gauge works with absolute negative value."""
+ cl = _tcp_client()
+ _test_gauge_delta(cl, 'tcp')
+
+
+def _test_gauge_absolute_negative_rate(cl, proto, mock_random):
mock_random.return_value = -1
- sc.gauge('foo', -1, rate=0.5, delta=False)
- _sock_check(sc, 1, 'foo:0|g\nfoo:-1|g')
+ cl.gauge('foo', -1, rate=0.5, delta=False)
+ _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g')
mock_random.return_value = 2
- sc.gauge('foo', -2, rate=0.5, delta=False)
- _sock_check(sc, 1, 'foo:0|g\nfoo:-1|g') # Should not have changed.
+ cl.gauge('foo', -2, rate=0.5, delta=False)
+ # Should not have changed.
+ _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g')
-@mock.patch.object(random, 'random', lambda: -1)
-def test_set():
- sc = _client()
- sc.set('foo', 10)
- _sock_check(sc, 1, 'foo:10|s')
+@mock.patch.object(random, 'random')
+def test_gauge_absolute_negative_rate_udp(mock_random):
+ """StatsClient.gauge works with absolute negative value and rate."""
+ cl = _udp_client()
+ _test_gauge_absolute_negative_rate(cl, 'udp', mock_random)
+
+
+@mock.patch.object(random, 'random')
+def test_gauge_absolute_negative_rate_tcp(mock_random):
+ """TCPStatsClient.gauge works with absolute negative value and rate."""
+ cl = _tcp_client()
+ _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random)
+
+
+def _test_set(cl, proto):
+ cl.set('foo', 10)
+ _sock_check(cl._sock, 1, proto, 'foo:10|s')
- sc.set('foo', 2.3)
- _sock_check(sc, 2, 'foo:2.3|s')
+ cl.set('foo', 2.3)
+ _sock_check(cl._sock, 2, proto, 'foo:2.3|s')
- sc.set('foo', 'bar')
- _sock_check(sc, 3, 'foo:bar|s')
+ cl.set('foo', 'bar')
+ _sock_check(cl._sock, 3, proto, 'foo:bar|s')
- sc.set('foo', 2.3, 0.5)
- _sock_check(sc, 4, 'foo:2.3|s|@0.5')
+ cl.set('foo', 2.3, 0.5)
+ _sock_check(cl._sock, 4, proto, 'foo:2.3|s|@0.5')
@mock.patch.object(random, 'random', lambda: -1)
-def test_timing():
- sc = _client()
+def test_set_udp():
+ """StatsClient.set works."""
+ cl = _udp_client()
+ _test_set(cl, 'udp')
- sc.timing('foo', 100)
- _sock_check(sc, 1, 'foo:100|ms')
- sc.timing('foo', 350)
- _sock_check(sc, 2, 'foo:350|ms')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_set_tcp():
+ """TCPStatsClient.set works."""
+ cl = _tcp_client()
+ _test_set(cl, 'tcp')
+
+
+def _test_timing(cl, proto):
+ cl.timing('foo', 100)
+ _sock_check(cl._sock, 1, proto, 'foo:100|ms')
- sc.timing('foo', 100, rate=0.5)
- _sock_check(sc, 3, 'foo:100|ms|@0.5')
+ cl.timing('foo', 350)
+ _sock_check(cl._sock, 2, proto, 'foo:350|ms')
+ cl.timing('foo', 100, rate=0.5)
+ _sock_check(cl._sock, 3, proto, 'foo:100|ms|@0.5')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timing_udp():
+ """StatsClient.timing works."""
+ cl = _udp_client()
+ _test_timing(cl, 'udp')
-def test_prepare():
- sc = _client(None)
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timing_tcp():
+ """TCPStatsClient.timing works."""
+ cl = _tcp_client()
+ _test_timing(cl, 'tcp')
+
+
+def _test_prepare(cl, proto):
tests = (
('foo:1|c', ('foo', '1|c', 1)),
('bar:50|ms|@0.5', ('bar', '50|ms', 0.5)),
@@ -205,268 +362,490 @@ def test_prepare():
def _check(o, s, v, r):
with mock.patch.object(random, 'random', lambda: -1):
- eq_(o, sc._prepare(s, v, r))
+ eq_(o, cl._prepare(s, v, r))
for o, (s, v, r) in tests:
- yield _check, o, s, v, r
+ _check(o, s, v, r)
-def test_prefix():
- sc = _client('foo')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_prepare_udp():
+ """Test StatsClient._prepare method."""
+ cl = _udp_client()
+ _test_prepare(cl, 'udp')
- sc.incr('bar')
- _sock_check(sc, 1, 'foo.bar:1|c')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_prepare_tcp():
+ """Test TCPStatsClient._prepare method."""
+ cl = _tcp_client()
+ _test_prepare(cl, 'tcp')
-def _timer_check(cl, count, start, end):
- eq_(cl._sock.sendto.call_count, count)
- value = cl._sock.sendto.call_args[0][0].decode('ascii')
- exp = re.compile('^%s:\d+|%s$' % (start, end))
- assert exp.match(value)
+
+def _test_prefix(cl, proto):
+ cl.incr('bar')
+ _sock_check(cl._sock, 1, proto, 'foo.bar:1|c')
-def test_timer_manager():
- """StatsClient.timer is a context manager."""
- sc = _client()
+@mock.patch.object(random, 'random', lambda: -1)
+def test_prefix_udp():
+ """StatsClient.incr works."""
+ cl = _udp_client(prefix='foo')
+ _test_prefix(cl, 'udp')
- with sc.timer('foo'):
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_prefix_tcp():
+ """TCPStatsClient.incr works."""
+ cl = _tcp_client(prefix='foo')
+ _test_prefix(cl, 'tcp')
+
+
+def _test_timer_manager(cl, proto):
+ with cl.timer('foo'):
pass
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
+
+def test_timer_manager_udp():
+ """StatsClient.timer can be used as manager."""
+ cl = _udp_client()
+ _test_timer_manager(cl, 'udp')
-def test_timer_decorator():
- """StatsClient.timer is a thread-safe decorator."""
- sc = _client()
- @sc.timer('foo')
+def test_timer_manager_tcp():
+ """TCPStatsClient.timer can be used as manager."""
+ cl = _tcp_client()
+ _test_timer_manager(cl, 'tcp')
+
+
+def _test_timer_decorator(cl, proto):
+ @cl.timer('foo')
def foo(a, b):
return [a, b]
- @sc.timer('bar')
+ @cl.timer('bar')
def bar(a, b):
return [b, a]
- # make sure it works with more than one decorator, called multiple times,
- # and that parameters are handled correctly
+ # make sure it works with more than one decorator, called multiple
+ # times, and that parameters are handled correctly
eq_([4, 2], foo(4, 2))
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
eq_([2, 4], bar(4, 2))
- _timer_check(sc, 2, 'bar', 'ms')
+ _timer_check(cl._sock, 2, proto, 'bar', 'ms')
eq_([6, 5], bar(5, 6))
- _timer_check(sc, 3, 'bar', 'ms')
+ _timer_check(cl._sock, 3, proto, 'bar', 'ms')
+
+def test_timer_decorator_udp():
+ """StatsClient.timer is a thread-safe decorator (UDP)."""
+ cl = _udp_client()
+ _test_timer_decorator(cl, 'udp')
-def test_timer_capture():
- """You can capture the output of StatsClient.timer."""
- sc = _client()
- with sc.timer('woo') as result:
+
+def test_timer_decorator_tcp():
+ """StatsClient.timer is a thread-safe decorator (TCP)."""
+ cl = _tcp_client()
+ _test_timer_decorator(cl, 'tcp')
+
+
+def _test_timer_capture(cl, proto):
+ with cl.timer('woo') as result:
eq_(result.ms, None)
assert isinstance(result.ms, int)
-@mock.patch.object(random, 'random', lambda: -1)
-def test_timer_context_rate():
- sc = _client()
+def test_timer_capture_udp():
+ """You can capture the output of StatsClient.timer (UDP)."""
+ cl = _udp_client()
+ _test_timer_capture(cl, 'udp')
+
- with sc.timer('foo', rate=0.5):
+def test_timer_capture_tcp():
+ """You can capture the output of StatsClient.timer (TCP)."""
+ cl = _tcp_client()
+ _test_timer_capture(cl, 'tcp')
+
+
+def _test_timer_context_rate(cl, proto):
+ with cl.timer('foo', rate=0.5):
pass
- _timer_check(sc, 1, 'foo', 'ms|@0.5')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.5')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_context_rate_udp():
+ """StatsClient.timer can be used as manager with rate."""
+ cl = _udp_client()
+ _test_timer_context_rate(cl, 'udp')
@mock.patch.object(random, 'random', lambda: -1)
-def test_timer_decorator_rate():
- sc = _client()
+def test_timer_context_rate_tcp():
+ """TCPStatsClient.timer can be used as manager with rate."""
+ cl = _tcp_client()
+ _test_timer_context_rate(cl, 'tcp')
+
- @sc.timer('foo', rate=0.1)
+def _test_timer_decorator_rate(cl, proto):
+ @cl.timer('foo', rate=0.1)
def foo(a, b):
return [b, a]
- @sc.timer('bar', rate=0.2)
+ @cl.timer('bar', rate=0.2)
def bar(a, b=2, c=3):
return [c, b, a]
eq_([2, 4], foo(4, 2))
- _timer_check(sc, 1, 'foo', 'ms|@0.1')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.1')
eq_([3, 2, 5], bar(5))
- _timer_check(sc, 2, 'bar', 'ms|@0.2')
+ _timer_check(cl._sock, 2, proto, 'bar', 'ms|@0.2')
-def test_timer_context_exceptions():
- """Exceptions within a managed block should get logged and propagate."""
- sc = _client()
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_decorator_rate_udp():
+ """StatsClient.timer can be used as decorator with rate."""
+ cl = _udp_client()
+ _test_timer_decorator_rate(cl, 'udp')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_decorator_rate_tcp():
+ """TCPStatsClient.timer can be used as decorator with rate."""
+ cl = _tcp_client()
+ _test_timer_decorator_rate(cl, 'tcp')
+
+def _test_timer_context_exceptions(cl, proto):
with assert_raises(socket.timeout):
- with sc.timer('foo'):
+ with cl.timer('foo'):
raise socket.timeout()
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
-def test_timer_decorator_exceptions():
- """Exceptions from wrapped methods should get logged and propagate."""
- sc = _client()
+def test_timer_context_exceptions_udp():
+ """Exceptions within a managed block should get logged and propagate (UDP)."""
+ cl = _udp_client()
+ _test_timer_context_exceptions(cl, 'udp')
- @sc.timer('foo')
+
+def test_timer_context_exceptions_tcp():
+ """Exceptions within a managed block should get logged and propagate (TCP)."""
+ cl = _tcp_client()
+ _test_timer_context_exceptions(cl, 'tcp')
+
+
+def _test_timer_decorator_exceptions(cl, proto):
+ @cl.timer('foo')
def foo():
raise ValueError()
with assert_raises(ValueError):
foo()
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
+
+
+def test_timer_decorator_exceptions_udp():
+ """Exceptions from wrapped methods should get logged and propagate (UDP)."""
+ cl = _udp_client()
+ _test_timer_decorator_exceptions(cl, 'udp')
+
+def test_timer_decorator_exceptions_tcp():
+ """Exceptions from wrapped methods should get logged and propagate (TCP)."""
+ cl = _tcp_client()
+ _test_timer_decorator_exceptions(cl, 'tcp')
-def test_timer_object():
- sc = _client()
- t = sc.timer('foo').start()
+def _test_timer_object(cl, proto):
+ t = cl.timer('foo').start()
t.stop()
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
-def test_timer_object_no_send():
- sc = _client()
+def test_timer_object_udp():
+ """StatsClient.timer works."""
+ cl = _udp_client()
+ _test_timer_object(cl, 'udp')
- t = sc.timer('foo').start()
+
+def test_timer_object_tcp():
+ """TCPStatsClient.timer works."""
+ cl = _tcp_client()
+ _test_timer_object(cl, 'tcp')
+
+
+def _test_timer_object_no_send(cl, proto):
+ t = cl.timer('foo').start()
t.stop(send=False)
- _sock_check(sc, 0)
+ _sock_check(cl._sock, 0, proto)
t.send()
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
-@mock.patch.object(random, 'random', lambda: -1)
-def test_timer_object_rate():
- sc = _client()
+def test_timer_object_no_send_udp():
+ """Stop StatsClient.timer without sending."""
+ cl = _udp_client()
+ _test_timer_object_no_send(cl, 'udp')
+
+
+def test_timer_object_no_send_tcp():
+ """Stop TCPStatsClient.timer without sending."""
+ cl = _tcp_client()
+ _test_timer_object_no_send(cl, 'tcp')
- t = sc.timer('foo', rate=0.5)
+
+def _test_timer_object_rate(cl, proto):
+ t = cl.timer('foo', rate=0.5)
t.start()
t.stop()
- _timer_check(sc, 1, 'foo', 'ms@0.5')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms@0.5')
+
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_object_rate_udp():
+ """StatsClient.timer works with rate."""
+ cl = _udp_client()
+ _test_timer_object_rate(cl, 'udp')
+
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_object_rate_tcp():
+ """TCPStatsClient.timer works with rate."""
+ cl = _tcp_client()
+ _test_timer_object_rate(cl, 'tcp')
-def test_timer_object_no_send_twice():
- sc = _client()
- t = sc.timer('foo').start()
+def _test_timer_object_no_send_twice(cl):
+ t = cl.timer('foo').start()
t.stop()
with assert_raises(RuntimeError):
t.send()
-def test_timer_send_without_stop():
- sc = _client()
- with sc.timer('foo') as t:
+def test_timer_object_no_send_twice_udp():
+ """StatsClient.timer raises RuntimeError if send is called twice."""
+ cl = _udp_client()
+ _test_timer_object_no_send_twice(cl)
+
+
+def test_timer_object_no_send_twice_tcp():
+ """TCPStatsClient.timer raises RuntimeError if send is called twice."""
+ cl = _tcp_client()
+ _test_timer_object_no_send_twice(cl)
+
+
+def _test_timer_send_without_stop(cl):
+ with cl.timer('foo') as t:
assert t.ms is None
with assert_raises(RuntimeError):
t.send()
- t = sc.timer('bar').start()
+ t = cl.timer('bar').start()
assert t.ms is None
with assert_raises(RuntimeError):
t.send()
-def test_timer_object_stop_without_start():
- sc = _client()
+def test_timer_send_without_stop_udp():
+ """StatsClient.timer raises error if send is called before stop."""
+ cl = _udp_client()
+ _test_timer_send_without_stop(cl)
+
+
+def test_timer_send_without_stop_tcp():
+ """TCPStatsClient.timer raises error if send is called before stop."""
+ cl = _tcp_client()
+ _test_timer_send_without_stop(cl)
+
+
+def _test_timer_object_stop_without_start(cl):
with assert_raises(RuntimeError):
- sc.timer('foo').stop()
+ cl.timer('foo').stop()
-def test_pipeline():
- sc = _client()
- pipe = sc.pipeline()
+def test_timer_object_stop_without_start_udp():
+ """StatsClient.timer raises error if stop is called before start."""
+ cl = _udp_client()
+ _test_timer_object_stop_without_start(cl)
+
+
+def test_timer_object_stop_without_start_tcp():
+ """TCPStatsClient.timer raises error if stop is called before start."""
+ cl = _tcp_client()
+ _test_timer_object_stop_without_start(cl)
+
+
+def _test_pipeline(cl, proto):
+ pipe = cl.pipeline()
pipe.incr('foo')
pipe.decr('bar')
pipe.timing('baz', 320)
pipe.send()
- _sock_check(sc, 1, 'foo:1|c\nbar:-1|c\nbaz:320|ms')
+ _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:320|ms')
-def test_pipeline_null():
- """Ensure we don't error on an empty pipeline."""
- sc = _client()
- pipe = sc.pipeline()
+def test_pipeline_udp():
+ """StatsClient.pipeline works."""
+ cl = _udp_client()
+ _test_pipeline(cl, 'udp')
+
+
+def test_pipeline_tcp():
+ """TCPStatsClient.pipeline works."""
+ cl = _tcp_client()
+ _test_pipeline(cl, 'tcp')
+
+
+def _test_pipeline_null(cl, proto):
+ pipe = cl.pipeline()
pipe.send()
- _sock_check(sc, 0)
+ _sock_check(cl._sock, 0, proto)
+
+
+def test_pipeline_null_udp():
+ """Ensure we don't error on an empty pipeline (UDP)."""
+ cl = _udp_client()
+ _test_pipeline_null(cl, 'udp')
-def test_pipeline_manager():
- sc = _client()
- with sc.pipeline() as pipe:
+def test_pipeline_null_tcp():
+ """Ensure we don't error on an empty pipeline (TCP)."""
+ cl = _tcp_client()
+ _test_pipeline_null(cl, 'tcp')
+
+
+def _test_pipeline_manager(cl, proto):
+ with cl.pipeline() as pipe:
pipe.incr('foo')
pipe.decr('bar')
pipe.gauge('baz', 15)
- _sock_check(sc, 1, 'foo:1|c\nbar:-1|c\nbaz:15|g')
+ _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:15|g')
+
+def test_pipeline_manager_udp():
+ """StatsClient.pipeline can be used as manager."""
+ cl = _udp_client()
+ _test_pipeline_manager(cl, 'udp')
-def test_pipeline_timer_manager():
- sc = _client()
- with sc.pipeline() as pipe:
+
+def test_pipeline_manager_tcp():
+ """TCPStatsClient.pipeline can be used as manager."""
+ cl = _tcp_client()
+ _test_pipeline_manager(cl, 'tcp')
+
+
+def _test_pipeline_timer_manager(cl, proto):
+ with cl.pipeline() as pipe:
with pipe.timer('foo'):
pass
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
+
+
+def test_pipeline_timer_manager_udp():
+ """Timer manager can be retrieve from UDP Pipeline manager."""
+ cl = _udp_client()
+ _test_pipeline_timer_manager(cl, 'udp')
-def test_pipeline_timer_decorator():
- sc = _client()
- with sc.pipeline() as pipe:
+def test_pipeline_timer_manager_tcp():
+ """Timer manager can be retrieve from TCP Pipeline manager."""
+ cl = _tcp_client()
+ _test_pipeline_timer_manager(cl, 'tcp')
+
+
+def _test_pipeline_timer_decorator(cl, proto):
+ with cl.pipeline() as pipe:
@pipe.timer('foo')
def foo():
pass
foo()
- _timer_check(sc, 1, 'foo', 'ms')
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
+
+
+def test_pipeline_timer_decorator_udp():
+ """UDP Pipeline manager can be used as decorator."""
+ cl = _udp_client()
+ _test_pipeline_timer_decorator(cl, 'udp')
+
+
+def test_pipeline_timer_decorator_tcp():
+ """TCP Pipeline manager can be used as decorator."""
+ cl = _tcp_client()
+ _test_pipeline_timer_decorator(cl, 'tcp')
-def test_pipeline_timer_object():
- sc = _client()
- with sc.pipeline() as pipe:
+def _test_pipeline_timer_object(cl, proto):
+ with cl.pipeline() as pipe:
t = pipe.timer('foo').start()
t.stop()
- _sock_check(sc, 0)
- _timer_check(sc, 1, 'foo', 'ms')
+ _sock_check(cl._sock, 0, proto)
+ _timer_check(cl._sock, 1, proto, 'foo', 'ms')
-def test_pipeline_empty():
- """Pipelines should be empty after a send() call."""
- sc = _client()
- with sc.pipeline() as pipe:
+def test_pipeline_timer_object_udp():
+ """Timer from UDP Pipeline manager works."""
+ cl = _udp_client()
+ _test_pipeline_timer_object(cl, 'udp')
+
+
+def test_pipeline_timer_object_tcp():
+ """Timer from TCP Pipeline manager works."""
+ cl = _tcp_client()
+ _test_pipeline_timer_object(cl, 'tcp')
+
+
+def _test_pipeline_empty(cl):
+ with cl.pipeline() as pipe:
pipe.incr('foo')
eq_(1, len(pipe._stats))
eq_(0, len(pipe._stats))
-def test_pipeline_packet_size():
- """Pipelines shouldn't send packets larger than 512 bytes."""
- sc = _client()
- pipe = sc.pipeline()
- for x in range(32):
- # 32 * 16 = 512, so this will need 2 packets.
- pipe.incr('sixteen_char_str')
- pipe.send()
- eq_(2, sc._sock.sendto.call_count)
- assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512
- assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512
+def test_pipeline_empty_udp():
+ """Pipelines should be empty after a send() call (UDP)."""
+ cl = _udp_client()
+ _test_pipeline_empty(cl)
+
+
+def test_pipeline_empty_tcp():
+ """Pipelines should be empty after a send() call (TCP)."""
+ cl = _tcp_client()
+ _test_pipeline_empty(cl)
-def test_pipeline_negative_absolute_gauge():
- """Negative absolute gauges use an internal pipeline."""
- sc = _client()
- with sc.pipeline() as pipe:
+def _test_pipeline_negative_absolute_gauge(cl, proto):
+ with cl.pipeline() as pipe:
pipe.gauge('foo', -10, delta=False)
pipe.incr('bar')
- _sock_check(sc, 1, 'foo:0|g\nfoo:-10|g\nbar:1|c')
+ _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-10|g\nbar:1|c')
+
+
+def test_pipeline_negative_absolute_gauge_udp():
+ """Negative absolute gauges use an internal pipeline (UDP)."""
+ cl = _udp_client()
+ _test_pipeline_negative_absolute_gauge(cl, 'udp')
-def test_big_numbers():
+def test_pipeline_negative_absolute_gauge_tcp():
+ """Negative absolute gauges use an internal pipeline (TCP)."""
+ cl = _tcp_client()
+ _test_pipeline_negative_absolute_gauge(cl, 'tcp')
+
+
+def _test_big_numbers(cl, proto):
num = 1234568901234
result = 'foo:1234568901234|%s'
tests = (
@@ -477,27 +856,83 @@ def test_big_numbers():
)
def _check(method, suffix):
- sc = _client()
- getattr(sc, method)('foo', num)
- _sock_check(sc, 1, result % suffix)
+ cl._sock.reset_mock()
+ getattr(cl, method)('foo', num)
+ _sock_check(cl._sock, 1, proto, result % suffix)
for method, suffix in tests:
- yield _check, method, suffix
+ _check(method, suffix)
+
+
+def test_big_numbers_udp():
+ """Test big numbers with UDP client."""
+ cl = _udp_client()
+ _test_big_numbers(cl, 'udp')
+
+
+def test_big_numbers_tcp():
+ """Test big numbers with TCP client."""
+ cl = _tcp_client()
+ _test_big_numbers(cl, 'tcp')
+
+
+def _test_rate_no_send(cl, proto):
+ cl.incr('foo', rate=0.5)
+ _sock_check(cl._sock, 0, proto)
+
+
+@mock.patch.object(random, 'random', lambda: 2)
+def test_rate_no_send_udp():
+ """Rate below random value prevents sending with StatsClient.incr."""
+ cl = _udp_client()
+ _test_rate_no_send(cl, 'udp')
@mock.patch.object(random, 'random', lambda: 2)
-def test_rate_no_send():
- sc = _client()
- sc.incr('foo', rate=0.5)
- _sock_check(sc, 0)
+def test_rate_no_send_tcp():
+ """Rate below random value prevents sending with TCPStatsClient.incr."""
+ cl = _tcp_client()
+ _test_rate_no_send(cl, 'tcp')
def test_socket_error():
- sc = _client()
- sc._sock.sendto.side_effect = socket.timeout()
- sc.incr('foo')
- _sock_check(sc, 1, 'foo:1|c')
+ """Socket error on StatsClient should be ignored."""
+ cl = _udp_client()
+ cl._sock.sendto.side_effect = socket.timeout()
+ cl.incr('foo')
+ _sock_check(cl._sock, 1, 'udp', 'foo:1|c')
+
+
+def test_pipeline_packet_size():
+ """Pipelines shouldn't send packets larger than 512 bytes (UDP only)."""
+ sc = _udp_client()
+ pipe = sc.pipeline()
+ for x in range(32):
+ # 32 * 16 = 512, so this will need 2 packets.
+ pipe.incr('sixteen_char_str')
+ pipe.send()
+ eq_(2, sc._sock.sendto.call_count)
+ assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512
+ assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512
-def test_ipv6_host():
- StatsClient('::1')
+@mock.patch.object(socket, 'socket')
+def test_tcp_raises_exception_to_user(mock_socket):
+ """Socket errors in TCPStatsClient should be raised to user."""
+ addr = ('127.0.0.1', 1234)
+ cl = _tcp_client(addr=addr[0], port=addr[1])
+ cl.incr('foo')
+ cl._sock.sendall.assert_called_once()
+ cl._sock.sendall.side_effect = socket.error
+ with assert_raises(socket.error):
+ cl.incr('foo')
+
+
+@mock.patch.object(socket, 'socket')
+def test_tcp_timeout(mock_socket):
+ """Timeout on TCPStatsClient should be set on socket."""
+ test_timeout = 321
+ cl = TCPStatsClient(timeout=test_timeout)
+ cl.incr('foo')
+ cl._sock.settimeout.assert_called_once()
+ cl._sock.settimeout.assert_called_with(test_timeout)