From 6c4f948f06d07ed57fecd6e9e3e7db312ced7b0c Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Tue, 10 Mar 2015 23:41:39 +0900 Subject: refactor StatsClient and Pipeline into base classes --- statsd/client.py | 66 ++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/statsd/client.py b/statsd/client.py index 8bca7f2..3f0da06 100644 --- a/statsd/client.py +++ b/statsd/client.py @@ -4,6 +4,7 @@ from functools import wraps import random import socket import time +import abc __all__ = ['StatsClient'] @@ -63,22 +64,18 @@ class Timer(object): self.client.timing(self.stat, self.ms, self.rate) -class StatsClient(object): - """A client for statsd.""" +class StatsClientBase(object): + """A Base class for various statsd clients.""" - def __init__(self, host='localhost', port=8125, prefix=None, - maxudpsize=512): - """Create a new client.""" - family, _, _, _, addr = socket.getaddrinfo( - host, port, 0, socket.SOCK_DGRAM - )[0] - self._addr = addr - self._sock = socket.socket(family, socket.SOCK_DGRAM) - self._prefix = prefix - self._maxudpsize = maxudpsize + __metaclass__ = abc.ABCMeta + @abc.abstractmethod + def _send(self): + pass + + @abc.abstractmethod def pipeline(self): - return Pipeline(self) + pass def timer(self, stat, rate=1): return Timer(self, stat, rate) @@ -130,6 +127,21 @@ class StatsClient(object): if data: self._send(data) + +class StatsClient(StatsClientBase): + """A client for statsd.""" + + def __init__(self, host='localhost', port=8125, prefix=None, + maxudpsize=512): + """Create a new client.""" + family, _, _, _, addr = socket.getaddrinfo( + host, port, 0, socket.SOCK_DGRAM + )[0] + self._addr = addr + self._sock = socket.socket(family, socket.SOCK_DGRAM) + self._prefix = prefix + self._maxudpsize = maxudpsize + def _send(self, data): """Send data to statsd.""" try: @@ -138,14 +150,23 @@ class StatsClient(object): # No time for love, Dr. Jones! pass + def pipeline(self): + return Pipeline(self) + + +class PipelineBase(StatsClientBase): + + __metaclass__ = abc.ABCMeta -class Pipeline(StatsClient): def __init__(self, client): self._client = client self._prefix = client._prefix - self._maxudpsize = client._maxudpsize self._stats = deque() + @abc.abstractmethod + def _send(self): + pass + def _after(self, data): if data is not None: self._stats.append(data) @@ -157,11 +178,24 @@ class Pipeline(StatsClient): self.send() def send(self): - # Use popleft to preserve the order of the stats. if not self._stats: return + self._send() + + def pipeline(self): + return self.__class__(self) + + +class Pipeline(PipelineBase): + + def __init__(self, client): + super(Pipeline, self).__init__(client) + self._maxudpsize = client._maxudpsize + + def _send(self): data = self._stats.popleft() while self._stats: + # Use popleft to preserve the order of the stats. stat = self._stats.popleft() if len(stat) + len(data) + 1 >= self._maxudpsize: self._client._after(data) -- cgit v1.2.1 From 6857690ae34b87f11572f64c5f93938acff42809 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Mon, 16 Mar 2015 23:31:06 +0900 Subject: Refactor tests to allow for multiple protocols --- statsd/tests.py | 595 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 388 insertions(+), 207 deletions(-) diff --git a/statsd/tests.py b/statsd/tests.py index 41f487c..f3f0a54 100644 --- a/statsd/tests.py +++ b/statsd/tests.py @@ -12,17 +12,46 @@ from statsd import StatsClient ADDR = (socket.gethostbyname('localhost'), 8125) -def _client(prefix=None): - sc = StatsClient(host=ADDR[0], port=ADDR[1], prefix=prefix) +# proto specific methods to get the socket method to send data +send_method = { + 'udp': lambda x: x.sendto, +} + + +# proto specific methods to create the expected value +make_val = { + 'udp': lambda x, addr: mock.call(str.encode(x), addr), +} + + +def _udp_client(prefix=None, addr=None, port=None): + if not addr: + addr = ADDR[0] + if not port: + port = ADDR[1] + sc = StatsClient(host=addr, port=port, prefix=prefix) sc._sock = mock.Mock() return sc -def _sock_check(cl, count, val=None): - eq_(cl._sock.sendto.call_count, count) +def _timer_check(sock, count, proto, start, end): + send = send_method[proto](sock) + eq_(send.call_count, count) + value = send.call_args[0][0].decode('ascii') + exp = re.compile('^%s:\d+|%s$' % (start, end)) + assert exp.match(value) + + +def _sock_check(sock, count, proto, val=None, addr=None): + send = send_method[proto](sock) + eq_(send.call_count, count) + if not addr: + addr = ADDR if val is not None: - val = val.encode('ascii') - eq_(cl._sock.sendto.call_args, ((val, ADDR), {})) + eq_( + send.call_args, + make_val[proto](val, addr), + ) class assert_raises(object): @@ -82,54 +111,80 @@ class assert_raises(object): return True +def _test_incr(cl, proto): + cl.incr('foo') + _sock_check(cl._sock, 1, proto, val='foo:1|c') + + cl.incr('foo', 10) + _sock_check(cl._sock, 2, proto, val='foo:10|c') + + cl.incr('foo', 1.2) + _sock_check(cl._sock, 3, proto, val='foo:1.2|c') + + cl.incr('foo', 10, rate=0.5) + _sock_check(cl._sock, 4, proto, val='foo:10|c|@0.5') + + @mock.patch.object(random, 'random', lambda: -1) -def test_incr(): - sc = _client() +def test_incr_udp(): + """StatsClient.incr works.""" + cl = _udp_client() + _test_incr(cl, 'udp') - sc.incr('foo') - _sock_check(sc, 1, 'foo:1|c') - sc.incr('foo', 10) - _sock_check(sc, 2, 'foo:10|c') +def _test_decr(cl, proto): + cl.decr('foo') + _sock_check(cl._sock, 1, proto, 'foo:-1|c') - sc.incr('foo', 1.2) - _sock_check(sc, 3, 'foo:1.2|c') + cl.decr('foo', 10) + _sock_check(cl._sock, 2, proto, 'foo:-10|c') - sc.incr('foo', 10, rate=0.5) - _sock_check(sc, 4, 'foo:10|c|@0.5') + cl.decr('foo', 1.2) + _sock_check(cl._sock, 3, proto, 'foo:-1.2|c') + + cl.decr('foo', 1, rate=0.5) + _sock_check(cl._sock, 4, proto, 'foo:-1|c|@0.5') @mock.patch.object(random, 'random', lambda: -1) -def test_decr(): - sc = _client() +def test_decr_udp(): + """StatsClient.decr works.""" + cl = _udp_client() + _test_decr(cl, 'udp') - sc.decr('foo') - _sock_check(sc, 1, 'foo:-1|c') - sc.decr('foo', 10) - _sock_check(sc, 2, 'foo:-10|c') +def _test_gauge(cl, proto): + cl.gauge('foo', 30) + _sock_check(cl._sock, 1, proto, 'foo:30|g') - sc.decr('foo', 1.2) - _sock_check(sc, 3, 'foo:-1.2|c') + cl.gauge('foo', 1.2) + _sock_check(cl._sock, 2, proto, 'foo:1.2|g') - sc.decr('foo', 1, rate=0.5) - _sock_check(sc, 4, 'foo:-1|c|@0.5') + cl.gauge('foo', 70, rate=0.5) + _sock_check(cl._sock, 3, proto, 'foo:70|g|@0.5') @mock.patch.object(random, 'random', lambda: -1) -def test_gauge(): - sc = _client() - sc.gauge('foo', 30) - _sock_check(sc, 1, 'foo:30|g') +def test_gauge_udp(): + """StatsClient.gauge works.""" + cl = _udp_client() + _test_gauge(cl, 'udp') + + +def _test_ipv6(cl, proto, addr): + cl.gauge('foo', 30) + _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr) - sc.gauge('foo', 1.2) - _sock_check(sc, 2, 'foo:1.2|g') - sc.gauge('foo', 70, rate=0.5) - _sock_check(sc, 3, 'foo:70|g|@0.5') +@mock.patch.object(random, 'random', lambda: -1) +def test_ipv6_udp(): + """StatsClient can use to IPv6 address.""" + addr = ('::1', 8125, 0, 0) + cl = _udp_client(addr=addr[0]) + _test_ipv6(cl, 'udp', addr) -def test_gauge_delta(): +def _test_gauge_delta(cl, proto): tests = ( (12, '+12'), (-13, '-13'), @@ -138,65 +193,91 @@ def test_gauge_delta(): ) def _check(num, result): - sc = _client() - sc.gauge('foo', num, delta=True) - _sock_check(sc, 1, 'foo:%s|g' % result) + cl._sock.reset_mock() + cl.gauge('foo', num, delta=True) + _sock_check(cl._sock, 1, proto, 'foo:%s|g' % result) for num, result in tests: - yield _check, num, result + _check(num, result) -def test_gauge_absolute_negative(): - sc = _client() - sc.gauge('foo', -5, delta=False) - _sock_check(sc, 1, 'foo:0|g\nfoo:-5|g') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_delta_udp(): + """StatsClient.gauge works with delta values.""" + cl = _udp_client() + _test_gauge_delta(cl, 'udp') -@mock.patch.object(random, 'random') -def test_gauge_absolute_negative_rate(mock_random): - sc = _client() +def _test_gauge_absolute_negative(cl, proto): + cl.gauge('foo', -5, delta=False) + _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g') + + +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_absolute_negative_udp(): + """StatsClient.gauge works with absolute negative value.""" + cl = _udp_client() + _test_gauge_delta(cl, 'udp') + + +def _test_gauge_absolute_negative_rate(cl, proto, mock_random): mock_random.return_value = -1 - sc.gauge('foo', -1, rate=0.5, delta=False) - _sock_check(sc, 1, 'foo:0|g\nfoo:-1|g') + cl.gauge('foo', -1, rate=0.5, delta=False) + _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g') mock_random.return_value = 2 - sc.gauge('foo', -2, rate=0.5, delta=False) - _sock_check(sc, 1, 'foo:0|g\nfoo:-1|g') # Should not have changed. + cl.gauge('foo', -2, rate=0.5, delta=False) + # Should not have changed. + _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g') -@mock.patch.object(random, 'random', lambda: -1) -def test_set(): - sc = _client() - sc.set('foo', 10) - _sock_check(sc, 1, 'foo:10|s') +@mock.patch.object(random, 'random') +def test_gauge_absolute_negative_rate_udp(mock_random): + """StatsClient.gauge works with absolute negative value and rate.""" + cl = _udp_client() + _test_gauge_absolute_negative_rate(cl, 'udp', mock_random) + + +def _test_set(cl, proto): + cl.set('foo', 10) + _sock_check(cl._sock, 1, proto, 'foo:10|s') - sc.set('foo', 2.3) - _sock_check(sc, 2, 'foo:2.3|s') + cl.set('foo', 2.3) + _sock_check(cl._sock, 2, proto, 'foo:2.3|s') - sc.set('foo', 'bar') - _sock_check(sc, 3, 'foo:bar|s') + cl.set('foo', 'bar') + _sock_check(cl._sock, 3, proto, 'foo:bar|s') - sc.set('foo', 2.3, 0.5) - _sock_check(sc, 4, 'foo:2.3|s|@0.5') + cl.set('foo', 2.3, 0.5) + _sock_check(cl._sock, 4, proto, 'foo:2.3|s|@0.5') @mock.patch.object(random, 'random', lambda: -1) -def test_timing(): - sc = _client() +def test_set_udp(): + """StatsClient.set works.""" + cl = _udp_client() + _test_set(cl, 'udp') - sc.timing('foo', 100) - _sock_check(sc, 1, 'foo:100|ms') - sc.timing('foo', 350) - _sock_check(sc, 2, 'foo:350|ms') +def _test_timing(cl, proto): + cl.timing('foo', 100) + _sock_check(cl._sock, 1, proto, 'foo:100|ms') - sc.timing('foo', 100, rate=0.5) - _sock_check(sc, 3, 'foo:100|ms|@0.5') + cl.timing('foo', 350) + _sock_check(cl._sock, 2, proto, 'foo:350|ms') + cl.timing('foo', 100, rate=0.5) + _sock_check(cl._sock, 3, proto, 'foo:100|ms|@0.5') + + +@mock.patch.object(random, 'random', lambda: -1) +def test_timing_udp(): + """StatsClient.timing works.""" + cl = _udp_client() + _test_timing(cl, 'udp') -def test_prepare(): - sc = _client(None) +def _test_prepare(cl, proto): tests = ( ('foo:1|c', ('foo', '1|c', 1)), ('bar:50|ms|@0.5', ('bar', '50|ms', 0.5)), @@ -205,268 +286,347 @@ def test_prepare(): def _check(o, s, v, r): with mock.patch.object(random, 'random', lambda: -1): - eq_(o, sc._prepare(s, v, r)) + eq_(o, cl._prepare(s, v, r)) for o, (s, v, r) in tests: - yield _check, o, s, v, r + _check(o, s, v, r) -def test_prefix(): - sc = _client('foo') +@mock.patch.object(random, 'random', lambda: -1) +def test_prepare_udp(): + """Test StatsClient._prepare method.""" + cl = _udp_client() + _test_prepare(cl, 'udp') - sc.incr('bar') - _sock_check(sc, 1, 'foo.bar:1|c') +def _test_prefix(cl, proto): + cl.incr('bar') + _sock_check(cl._sock, 1, proto, 'foo.bar:1|c') -def _timer_check(cl, count, start, end): - eq_(cl._sock.sendto.call_count, count) - value = cl._sock.sendto.call_args[0][0].decode('ascii') - exp = re.compile('^%s:\d+|%s$' % (start, end)) - assert exp.match(value) +@mock.patch.object(random, 'random', lambda: -1) +def test_prefix_udp(): + """StatsClient.incr works.""" + cl = _udp_client(prefix='foo') + _test_prefix(cl, 'udp') -def test_timer_manager(): - """StatsClient.timer is a context manager.""" - sc = _client() - with sc.timer('foo'): +def _test_timer_manager(cl, proto): + with cl.timer('foo'): pass - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') + +def test_timer_manager_udp(): + """StatsClient.timer can be used as manager.""" + cl = _udp_client() + _test_timer_manager(cl, 'udp') -def test_timer_decorator(): - """StatsClient.timer is a thread-safe decorator.""" - sc = _client() - @sc.timer('foo') +def _test_timer_decorator(cl, proto): + @cl.timer('foo') def foo(a, b): return [a, b] - @sc.timer('bar') + @cl.timer('bar') def bar(a, b): return [b, a] - # make sure it works with more than one decorator, called multiple times, - # and that parameters are handled correctly + # make sure it works with more than one decorator, called multiple + # times, and that parameters are handled correctly eq_([4, 2], foo(4, 2)) - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') eq_([2, 4], bar(4, 2)) - _timer_check(sc, 2, 'bar', 'ms') + _timer_check(cl._sock, 2, proto, 'bar', 'ms') eq_([6, 5], bar(5, 6)) - _timer_check(sc, 3, 'bar', 'ms') + _timer_check(cl._sock, 3, proto, 'bar', 'ms') + +def test_timer_decorator_udp(): + """StatsClient.timer is a thread-safe decorator (UDP).""" + cl = _udp_client() + _test_timer_decorator(cl, 'udp') -def test_timer_capture(): - """You can capture the output of StatsClient.timer.""" - sc = _client() - with sc.timer('woo') as result: + +def _test_timer_capture(cl, proto): + with cl.timer('woo') as result: eq_(result.ms, None) assert isinstance(result.ms, int) -@mock.patch.object(random, 'random', lambda: -1) -def test_timer_context_rate(): - sc = _client() +def test_timer_capture_udp(): + """You can capture the output of StatsClient.timer (UDP).""" + cl = _udp_client() + _test_timer_capture(cl, 'udp') + - with sc.timer('foo', rate=0.5): +def _test_timer_context_rate(cl, proto): + with cl.timer('foo', rate=0.5): pass - _timer_check(sc, 1, 'foo', 'ms|@0.5') + _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.5') @mock.patch.object(random, 'random', lambda: -1) -def test_timer_decorator_rate(): - sc = _client() +def test_timer_context_rate_udp(): + """StatsClient.timer can be used as manager with rate.""" + cl = _udp_client() + _test_timer_context_rate(cl, 'udp') - @sc.timer('foo', rate=0.1) + +def _test_timer_decorator_rate(cl, proto): + @cl.timer('foo', rate=0.1) def foo(a, b): return [b, a] - @sc.timer('bar', rate=0.2) + @cl.timer('bar', rate=0.2) def bar(a, b=2, c=3): return [c, b, a] eq_([2, 4], foo(4, 2)) - _timer_check(sc, 1, 'foo', 'ms|@0.1') + _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.1') eq_([3, 2, 5], bar(5)) - _timer_check(sc, 2, 'bar', 'ms|@0.2') + _timer_check(cl._sock, 2, proto, 'bar', 'ms|@0.2') -def test_timer_context_exceptions(): - """Exceptions within a managed block should get logged and propagate.""" - sc = _client() +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_decorator_rate_udp(): + """StatsClient.timer can be used as decorator with rate.""" + cl = _udp_client() + _test_timer_decorator_rate(cl, 'udp') + +def _test_timer_context_exceptions(cl, proto): with assert_raises(socket.timeout): - with sc.timer('foo'): + with cl.timer('foo'): raise socket.timeout() - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') -def test_timer_decorator_exceptions(): - """Exceptions from wrapped methods should get logged and propagate.""" - sc = _client() +def test_timer_context_exceptions_udp(): + """Exceptions within a managed block should get logged and propagate.""" + cl = _udp_client() + _test_timer_context_exceptions(cl, 'udp') - @sc.timer('foo') + +def _test_timer_decorator_exceptions(cl, proto): + @cl.timer('foo') def foo(): raise ValueError() with assert_raises(ValueError): foo() - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') + +def test_timer_decorator_exceptions_udp(): + """Exceptions from wrapped methods should get logged and propagate.""" + cl = _udp_client() + _test_timer_decorator_exceptions(cl, 'udp') -def test_timer_object(): - sc = _client() - t = sc.timer('foo').start() +def _test_timer_object(cl, proto): + t = cl.timer('foo').start() t.stop() - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') -def test_timer_object_no_send(): - sc = _client() +def test_timer_object_udp(): + """StatsClient.timer works.""" + cl = _udp_client() + _test_timer_object(cl, 'udp') - t = sc.timer('foo').start() + +def _test_timer_object_no_send(cl, proto): + t = cl.timer('foo').start() t.stop(send=False) - _sock_check(sc, 0) + _sock_check(cl._sock, 0, proto) t.send() - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') -@mock.patch.object(random, 'random', lambda: -1) -def test_timer_object_rate(): - sc = _client() +def test_timer_object_no_send_udp(): + """Stop StatsClient.timer without sending.""" + cl = _udp_client() + _test_timer_object_no_send(cl, 'udp') - t = sc.timer('foo', rate=0.5) + +def _test_timer_object_rate(cl, proto): + t = cl.timer('foo', rate=0.5) t.start() t.stop() - _timer_check(sc, 1, 'foo', 'ms@0.5') + _timer_check(cl._sock, 1, proto, 'foo', 'ms@0.5') -def test_timer_object_no_send_twice(): - sc = _client() +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_object_rate_udp(): + """StatsClient.timer works with rate.""" + cl = _udp_client() + _test_timer_object_rate(cl, 'udp') - t = sc.timer('foo').start() + +def _test_timer_object_no_send_twice(cl): + t = cl.timer('foo').start() t.stop() with assert_raises(RuntimeError): t.send() -def test_timer_send_without_stop(): - sc = _client() - with sc.timer('foo') as t: +def test_timer_object_no_send_twice_udp(): + """StatsClient.timer raises RuntimeError if send is called twice.""" + cl = _udp_client() + _test_timer_object_no_send_twice(cl) + + +def _test_timer_send_without_stop(cl): + with cl.timer('foo') as t: assert t.ms is None with assert_raises(RuntimeError): t.send() - t = sc.timer('bar').start() + t = cl.timer('bar').start() assert t.ms is None with assert_raises(RuntimeError): t.send() -def test_timer_object_stop_without_start(): - sc = _client() +def test_timer_send_without_stop_udp(): + """StatsClient.timer raises error if send is called before stop.""" + cl = _udp_client() + _test_timer_send_without_stop(cl) + + +def _test_timer_object_stop_without_start(cl): with assert_raises(RuntimeError): - sc.timer('foo').stop() + cl.timer('foo').stop() -def test_pipeline(): - sc = _client() - pipe = sc.pipeline() +def test_timer_object_stop_without_start_udp(): + """StatsClient.timer raises error if stop is called before start.""" + cl = _udp_client() + _test_timer_object_stop_without_start(cl) + + +def _test_pipeline(cl, proto): + pipe = cl.pipeline() pipe.incr('foo') pipe.decr('bar') pipe.timing('baz', 320) pipe.send() - _sock_check(sc, 1, 'foo:1|c\nbar:-1|c\nbaz:320|ms') + _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:320|ms') -def test_pipeline_null(): - """Ensure we don't error on an empty pipeline.""" - sc = _client() - pipe = sc.pipeline() +def test_pipeline_udp(): + """StatsClient.pipeline works.""" + cl = _udp_client() + _test_pipeline(cl, 'udp') + + +def _test_pipeline_null(cl, proto): + pipe = cl.pipeline() pipe.send() - _sock_check(sc, 0) + _sock_check(cl._sock, 0, proto) + +def test_pipeline_null_udp(): + """Ensure we don't error on an empty pipeline (UDP).""" + cl = _udp_client() + _test_pipeline_null(cl, 'udp') -def test_pipeline_manager(): - sc = _client() - with sc.pipeline() as pipe: + +def _test_pipeline_manager(cl, proto): + with cl.pipeline() as pipe: pipe.incr('foo') pipe.decr('bar') pipe.gauge('baz', 15) - _sock_check(sc, 1, 'foo:1|c\nbar:-1|c\nbaz:15|g') + _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:15|g') + + +def test_pipeline_manager_udp(): + """StatsClient.pipeline can be used as manager.""" + cl = _udp_client() + _test_pipeline_manager(cl, 'udp') -def test_pipeline_timer_manager(): - sc = _client() - with sc.pipeline() as pipe: +def _test_pipeline_timer_manager(cl, proto): + with cl.pipeline() as pipe: with pipe.timer('foo'): pass - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') -def test_pipeline_timer_decorator(): - sc = _client() - with sc.pipeline() as pipe: +def test_pipeline_timer_manager_udp(): + """Timer manager can be retrieve from UDP Pipeline manager.""" + cl = _udp_client() + _test_pipeline_timer_manager(cl, 'udp') + + +def _test_pipeline_timer_decorator(cl, proto): + with cl.pipeline() as pipe: @pipe.timer('foo') def foo(): pass foo() - _timer_check(sc, 1, 'foo', 'ms') + _timer_check(cl._sock, 1, proto, 'foo', 'ms') + +def test_pipeline_timer_decorator_udp(): + """UDP Pipeline manager can be used as decorator.""" + cl = _udp_client() + _test_pipeline_timer_decorator(cl, 'udp') -def test_pipeline_timer_object(): - sc = _client() - with sc.pipeline() as pipe: + +def _test_pipeline_timer_object(cl, proto): + with cl.pipeline() as pipe: t = pipe.timer('foo').start() t.stop() - _sock_check(sc, 0) - _timer_check(sc, 1, 'foo', 'ms') + _sock_check(cl._sock, 0, proto) + _timer_check(cl._sock, 1, proto, 'foo', 'ms') + + +def test_pipeline_timer_object_udp(): + """Timer from UDP Pipeline manager works.""" + cl = _udp_client() + _test_pipeline_timer_object(cl, 'udp') -def test_pipeline_empty(): - """Pipelines should be empty after a send() call.""" - sc = _client() - with sc.pipeline() as pipe: +def _test_pipeline_empty(cl): + with cl.pipeline() as pipe: pipe.incr('foo') eq_(1, len(pipe._stats)) eq_(0, len(pipe._stats)) -def test_pipeline_packet_size(): - """Pipelines shouldn't send packets larger than 512 bytes.""" - sc = _client() - pipe = sc.pipeline() - for x in range(32): - # 32 * 16 = 512, so this will need 2 packets. - pipe.incr('sixteen_char_str') - pipe.send() - eq_(2, sc._sock.sendto.call_count) - assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512 - assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512 +def test_pipeline_empty_udp(): + """Pipelines should be empty after a send() call (UDP).""" + cl = _udp_client() + _test_pipeline_empty(cl) -def test_pipeline_negative_absolute_gauge(): - """Negative absolute gauges use an internal pipeline.""" - sc = _client() - with sc.pipeline() as pipe: +def _test_pipeline_negative_absolute_gauge(cl, proto): + with cl.pipeline() as pipe: pipe.gauge('foo', -10, delta=False) pipe.incr('bar') - _sock_check(sc, 1, 'foo:0|g\nfoo:-10|g\nbar:1|c') + _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-10|g\nbar:1|c') -def test_big_numbers(): +def test_pipeline_negative_absolute_gauge_udp(): + """Negative absolute gauges use an internal pipeline (UDP).""" + cl = _udp_client() + _test_pipeline_negative_absolute_gauge(cl, 'udp') + + +def _test_big_numbers(cl, proto): num = 1234568901234 result = 'foo:1234568901234|%s' tests = ( @@ -477,27 +637,48 @@ def test_big_numbers(): ) def _check(method, suffix): - sc = _client() - getattr(sc, method)('foo', num) - _sock_check(sc, 1, result % suffix) + cl._sock.reset_mock() + getattr(cl, method)('foo', num) + _sock_check(cl._sock, 1, proto, result % suffix) for method, suffix in tests: - yield _check, method, suffix + _check(method, suffix) + + +def test_big_numbers_udp(): + """Test big numbers with UDP client.""" + cl = _udp_client() + _test_big_numbers(cl, 'udp') + + +def _test_rate_no_send(cl, proto): + cl.incr('foo', rate=0.5) + _sock_check(cl._sock, 0, proto) @mock.patch.object(random, 'random', lambda: 2) -def test_rate_no_send(): - sc = _client() - sc.incr('foo', rate=0.5) - _sock_check(sc, 0) +def test_rate_no_send_udp(): + """Rate below random value prevents sending with StatsClient.incr.""" + cl = _udp_client() + _test_rate_no_send(cl, 'udp') def test_socket_error(): - sc = _client() - sc._sock.sendto.side_effect = socket.timeout() - sc.incr('foo') - _sock_check(sc, 1, 'foo:1|c') + """Socket error on StatsClient should be ignored.""" + cl = _udp_client() + cl._sock.sendto.side_effect = socket.timeout() + cl.incr('foo') + _sock_check(cl._sock, 1, 'udp', 'foo:1|c') -def test_ipv6_host(): - StatsClient('::1') +def test_pipeline_packet_size(): + """Pipelines shouldn't send packets larger than 512 bytes (UDP only).""" + sc = _udp_client() + pipe = sc.pipeline() + for x in range(32): + # 32 * 16 = 512, so this will need 2 packets. + pipe.incr('sixteen_char_str') + pipe.send() + eq_(2, sc._sock.sendto.call_count) + assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512 + assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512 -- cgit v1.2.1 From 82a543aa0b5c609c456f94e86ff263cb30e90997 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Mon, 16 Mar 2015 23:37:12 +0900 Subject: Add TCPStatsClient and TCPPipeline --- statsd/__init__.py | 3 +- statsd/client.py | 49 +++++++++- statsd/tests.py | 258 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 306 insertions(+), 4 deletions(-) diff --git a/statsd/__init__.py b/statsd/__init__.py index a4f0372..79f54f5 100644 --- a/statsd/__init__.py +++ b/statsd/__init__.py @@ -1,8 +1,9 @@ from __future__ import absolute_import from .client import StatsClient +from .client import TCPStatsClient VERSION = (3, 0, 1) __version__ = '.'.join(map(str, VERSION)) -__all__ = ['StatsClient'] +__all__ = ['StatsClient', 'TCPStatsClient'] diff --git a/statsd/client.py b/statsd/client.py index 3f0da06..a423dae 100644 --- a/statsd/client.py +++ b/statsd/client.py @@ -7,7 +7,7 @@ import time import abc -__all__ = ['StatsClient'] +__all__ = ['StatsClient', 'TCPStatsClient'] class Timer(object): @@ -154,6 +154,46 @@ class StatsClient(StatsClientBase): return Pipeline(self) +class TCPStatsClient(StatsClientBase): + """TCP version of StatsClient.""" + + def __init__(self, host='localhost', port=8125, prefix=None, timeout=None): + """Create a new client.""" + self._host = host + self._port = port + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def _send(self, data): + """Send data to statsd.""" + if not self._sock: + self.connect() + self._do_send(data) + + def _do_send(self, data): + self._sock.sendall(data.encode('ascii') + b'\n') + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def connect(self): + family, _, _, _, addr = socket.getaddrinfo( + self._host, self._port, 0, socket.SOCK_STREAM)[0] + self._sock = socket.socket(family, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(addr) + + def pipeline(self): + return TCPPipeline(self) + + def reconnect(self, data): + self.close() + self.connect() + + class PipelineBase(StatsClientBase): __metaclass__ = abc.ABCMeta @@ -203,3 +243,10 @@ class Pipeline(PipelineBase): else: data += '\n' + stat self._client._after(data) + + +class TCPPipeline(PipelineBase): + + def _send(self): + self._client._after('\n'.join(self._stats)) + self._stats.clear() diff --git a/statsd/tests.py b/statsd/tests.py index f3f0a54..a6a525c 100644 --- a/statsd/tests.py +++ b/statsd/tests.py @@ -7,6 +7,7 @@ import mock from nose.tools import eq_ from statsd import StatsClient +from statsd import TCPStatsClient ADDR = (socket.gethostbyname('localhost'), 8125) @@ -15,12 +16,14 @@ ADDR = (socket.gethostbyname('localhost'), 8125) # proto specific methods to get the socket method to send data send_method = { 'udp': lambda x: x.sendto, + 'tcp': lambda x: x.sendall, } # proto specific methods to create the expected value make_val = { 'udp': lambda x, addr: mock.call(str.encode(x), addr), + 'tcp': lambda x, addr: mock.call(str.encode(x + '\n')), } @@ -34,6 +37,16 @@ def _udp_client(prefix=None, addr=None, port=None): return sc +def _tcp_client(prefix=None, addr=None, port=None, timeout=None): + if not addr: + addr = ADDR[0] + if not port: + port = ADDR[1] + sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout) + sc._sock = mock.Mock() + return sc + + def _timer_check(sock, count, proto, start, end): send = send_method[proto](sock) eq_(send.call_count, count) @@ -132,6 +145,13 @@ def test_incr_udp(): _test_incr(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_incr_tcp(): + """TCPStatsClient.incr works.""" + cl = _tcp_client() + _test_incr(cl, 'tcp') + + def _test_decr(cl, proto): cl.decr('foo') _sock_check(cl._sock, 1, proto, 'foo:-1|c') @@ -153,6 +173,13 @@ def test_decr_udp(): _test_decr(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_decr_tcp(): + """TCPStatsClient.decr works.""" + cl = _tcp_client() + _test_decr(cl, 'tcp') + + def _test_gauge(cl, proto): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g') @@ -171,6 +198,13 @@ def test_gauge_udp(): _test_gauge(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_tcp(): + """TCPStatsClient.gauge works.""" + cl = _tcp_client() + _test_gauge(cl, 'tcp') + + def _test_ipv6(cl, proto, addr): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr) @@ -183,6 +217,13 @@ def test_ipv6_udp(): cl = _udp_client(addr=addr[0]) _test_ipv6(cl, 'udp', addr) +@mock.patch.object(random, 'random', lambda: -1) +def test_ipv6_tcp(): + """TCPStatsClient can use to IPv6 address.""" + addr = ('::1', 8125, 0, 0) + cl = _tcp_client(addr=addr[0]) + _test_ipv6(cl, 'tcp', addr) + def _test_gauge_delta(cl, proto): tests = ( @@ -208,6 +249,13 @@ def test_gauge_delta_udp(): _test_gauge_delta(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_delta_tcp(): + """TCPStatsClient.gauge works with delta values.""" + cl = _tcp_client() + _test_gauge_delta(cl, 'tcp') + + def _test_gauge_absolute_negative(cl, proto): cl.gauge('foo', -5, delta=False) _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g') @@ -220,6 +268,13 @@ def test_gauge_absolute_negative_udp(): _test_gauge_delta(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_absolute_negative_tcp(): + """TCPStatsClient.gauge works with absolute negative value.""" + cl = _tcp_client() + _test_gauge_delta(cl, 'tcp') + + def _test_gauge_absolute_negative_rate(cl, proto, mock_random): mock_random.return_value = -1 cl.gauge('foo', -1, rate=0.5, delta=False) @@ -238,6 +293,13 @@ def test_gauge_absolute_negative_rate_udp(mock_random): _test_gauge_absolute_negative_rate(cl, 'udp', mock_random) +@mock.patch.object(random, 'random') +def test_gauge_absolute_negative_rate_tcp(mock_random): + """TCPStatsClient.gauge works with absolute negative value and rate.""" + cl = _tcp_client() + _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random) + + def _test_set(cl, proto): cl.set('foo', 10) _sock_check(cl._sock, 1, proto, 'foo:10|s') @@ -259,6 +321,13 @@ def test_set_udp(): _test_set(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_set_tcp(): + """TCPStatsClient.set works.""" + cl = _tcp_client() + _test_set(cl, 'tcp') + + def _test_timing(cl, proto): cl.timing('foo', 100) _sock_check(cl._sock, 1, proto, 'foo:100|ms') @@ -277,6 +346,13 @@ def test_timing_udp(): _test_timing(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timing_tcp(): + """TCPStatsClient.timing works.""" + cl = _tcp_client() + _test_timing(cl, 'tcp') + + def _test_prepare(cl, proto): tests = ( ('foo:1|c', ('foo', '1|c', 1)), @@ -299,6 +375,13 @@ def test_prepare_udp(): _test_prepare(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_prepare_tcp(): + """Test TCPStatsClient._prepare method.""" + cl = _tcp_client() + _test_prepare(cl, 'tcp') + + def _test_prefix(cl, proto): cl.incr('bar') _sock_check(cl._sock, 1, proto, 'foo.bar:1|c') @@ -311,6 +394,13 @@ def test_prefix_udp(): _test_prefix(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_prefix_tcp(): + """TCPStatsClient.incr works.""" + cl = _tcp_client(prefix='foo') + _test_prefix(cl, 'tcp') + + def _test_timer_manager(cl, proto): with cl.timer('foo'): pass @@ -324,6 +414,12 @@ def test_timer_manager_udp(): _test_timer_manager(cl, 'udp') +def test_timer_manager_tcp(): + """TCPStatsClient.timer can be used as manager.""" + cl = _tcp_client() + _test_timer_manager(cl, 'tcp') + + def _test_timer_decorator(cl, proto): @cl.timer('foo') def foo(a, b): @@ -351,6 +447,12 @@ def test_timer_decorator_udp(): _test_timer_decorator(cl, 'udp') +def test_timer_decorator_tcp(): + """StatsClient.timer is a thread-safe decorator (TCP).""" + cl = _tcp_client() + _test_timer_decorator(cl, 'tcp') + + def _test_timer_capture(cl, proto): with cl.timer('woo') as result: eq_(result.ms, None) @@ -363,6 +465,12 @@ def test_timer_capture_udp(): _test_timer_capture(cl, 'udp') +def test_timer_capture_tcp(): + """You can capture the output of StatsClient.timer (TCP).""" + cl = _tcp_client() + _test_timer_capture(cl, 'tcp') + + def _test_timer_context_rate(cl, proto): with cl.timer('foo', rate=0.5): pass @@ -377,6 +485,13 @@ def test_timer_context_rate_udp(): _test_timer_context_rate(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_context_rate_tcp(): + """TCPStatsClient.timer can be used as manager with rate.""" + cl = _tcp_client() + _test_timer_context_rate(cl, 'tcp') + + def _test_timer_decorator_rate(cl, proto): @cl.timer('foo', rate=0.1) def foo(a, b): @@ -400,6 +515,13 @@ def test_timer_decorator_rate_udp(): _test_timer_decorator_rate(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_decorator_rate_tcp(): + """TCPStatsClient.timer can be used as decorator with rate.""" + cl = _tcp_client() + _test_timer_decorator_rate(cl, 'tcp') + + def _test_timer_context_exceptions(cl, proto): with assert_raises(socket.timeout): with cl.timer('foo'): @@ -409,11 +531,17 @@ def _test_timer_context_exceptions(cl, proto): def test_timer_context_exceptions_udp(): - """Exceptions within a managed block should get logged and propagate.""" + """Exceptions within a managed block should get logged and propagate (UDP).""" cl = _udp_client() _test_timer_context_exceptions(cl, 'udp') +def test_timer_context_exceptions_tcp(): + """Exceptions within a managed block should get logged and propagate (TCP).""" + cl = _tcp_client() + _test_timer_context_exceptions(cl, 'tcp') + + def _test_timer_decorator_exceptions(cl, proto): @cl.timer('foo') def foo(): @@ -426,11 +554,17 @@ def _test_timer_decorator_exceptions(cl, proto): def test_timer_decorator_exceptions_udp(): - """Exceptions from wrapped methods should get logged and propagate.""" + """Exceptions from wrapped methods should get logged and propagate (UDP).""" cl = _udp_client() _test_timer_decorator_exceptions(cl, 'udp') +def test_timer_decorator_exceptions_tcp(): + """Exceptions from wrapped methods should get logged and propagate (TCP).""" + cl = _tcp_client() + _test_timer_decorator_exceptions(cl, 'tcp') + + def _test_timer_object(cl, proto): t = cl.timer('foo').start() t.stop() @@ -444,6 +578,12 @@ def test_timer_object_udp(): _test_timer_object(cl, 'udp') +def test_timer_object_tcp(): + """TCPStatsClient.timer works.""" + cl = _tcp_client() + _test_timer_object(cl, 'tcp') + + def _test_timer_object_no_send(cl, proto): t = cl.timer('foo').start() t.stop(send=False) @@ -459,6 +599,12 @@ def test_timer_object_no_send_udp(): _test_timer_object_no_send(cl, 'udp') +def test_timer_object_no_send_tcp(): + """Stop TCPStatsClient.timer without sending.""" + cl = _tcp_client() + _test_timer_object_no_send(cl, 'tcp') + + def _test_timer_object_rate(cl, proto): t = cl.timer('foo', rate=0.5) t.start() @@ -474,6 +620,13 @@ def test_timer_object_rate_udp(): _test_timer_object_rate(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_object_rate_tcp(): + """TCPStatsClient.timer works with rate.""" + cl = _tcp_client() + _test_timer_object_rate(cl, 'tcp') + + def _test_timer_object_no_send_twice(cl): t = cl.timer('foo').start() t.stop() @@ -488,6 +641,12 @@ def test_timer_object_no_send_twice_udp(): _test_timer_object_no_send_twice(cl) +def test_timer_object_no_send_twice_tcp(): + """TCPStatsClient.timer raises RuntimeError if send is called twice.""" + cl = _tcp_client() + _test_timer_object_no_send_twice(cl) + + def _test_timer_send_without_stop(cl): with cl.timer('foo') as t: assert t.ms is None @@ -506,6 +665,12 @@ def test_timer_send_without_stop_udp(): _test_timer_send_without_stop(cl) +def test_timer_send_without_stop_tcp(): + """TCPStatsClient.timer raises error if send is called before stop.""" + cl = _tcp_client() + _test_timer_send_without_stop(cl) + + def _test_timer_object_stop_without_start(cl): with assert_raises(RuntimeError): cl.timer('foo').stop() @@ -517,6 +682,12 @@ def test_timer_object_stop_without_start_udp(): _test_timer_object_stop_without_start(cl) +def test_timer_object_stop_without_start_tcp(): + """TCPStatsClient.timer raises error if stop is called before start.""" + cl = _tcp_client() + _test_timer_object_stop_without_start(cl) + + def _test_pipeline(cl, proto): pipe = cl.pipeline() pipe.incr('foo') @@ -532,6 +703,12 @@ def test_pipeline_udp(): _test_pipeline(cl, 'udp') +def test_pipeline_tcp(): + """TCPStatsClient.pipeline works.""" + cl = _tcp_client() + _test_pipeline(cl, 'tcp') + + def _test_pipeline_null(cl, proto): pipe = cl.pipeline() pipe.send() @@ -544,6 +721,12 @@ def test_pipeline_null_udp(): _test_pipeline_null(cl, 'udp') +def test_pipeline_null_tcp(): + """Ensure we don't error on an empty pipeline (TCP).""" + cl = _tcp_client() + _test_pipeline_null(cl, 'tcp') + + def _test_pipeline_manager(cl, proto): with cl.pipeline() as pipe: pipe.incr('foo') @@ -558,6 +741,12 @@ def test_pipeline_manager_udp(): _test_pipeline_manager(cl, 'udp') +def test_pipeline_manager_tcp(): + """TCPStatsClient.pipeline can be used as manager.""" + cl = _tcp_client() + _test_pipeline_manager(cl, 'tcp') + + def _test_pipeline_timer_manager(cl, proto): with cl.pipeline() as pipe: with pipe.timer('foo'): @@ -571,6 +760,12 @@ def test_pipeline_timer_manager_udp(): _test_pipeline_timer_manager(cl, 'udp') +def test_pipeline_timer_manager_tcp(): + """Timer manager can be retrieve from TCP Pipeline manager.""" + cl = _tcp_client() + _test_pipeline_timer_manager(cl, 'tcp') + + def _test_pipeline_timer_decorator(cl, proto): with cl.pipeline() as pipe: @pipe.timer('foo') @@ -586,6 +781,12 @@ def test_pipeline_timer_decorator_udp(): _test_pipeline_timer_decorator(cl, 'udp') +def test_pipeline_timer_decorator_tcp(): + """TCP Pipeline manager can be used as decorator.""" + cl = _tcp_client() + _test_pipeline_timer_decorator(cl, 'tcp') + + def _test_pipeline_timer_object(cl, proto): with cl.pipeline() as pipe: t = pipe.timer('foo').start() @@ -600,6 +801,12 @@ def test_pipeline_timer_object_udp(): _test_pipeline_timer_object(cl, 'udp') +def test_pipeline_timer_object_tcp(): + """Timer from TCP Pipeline manager works.""" + cl = _tcp_client() + _test_pipeline_timer_object(cl, 'tcp') + + def _test_pipeline_empty(cl): with cl.pipeline() as pipe: pipe.incr('foo') @@ -613,6 +820,12 @@ def test_pipeline_empty_udp(): _test_pipeline_empty(cl) +def test_pipeline_empty_tcp(): + """Pipelines should be empty after a send() call (TCP).""" + cl = _tcp_client() + _test_pipeline_empty(cl) + + def _test_pipeline_negative_absolute_gauge(cl, proto): with cl.pipeline() as pipe: pipe.gauge('foo', -10, delta=False) @@ -626,6 +839,12 @@ def test_pipeline_negative_absolute_gauge_udp(): _test_pipeline_negative_absolute_gauge(cl, 'udp') +def test_pipeline_negative_absolute_gauge_tcp(): + """Negative absolute gauges use an internal pipeline (TCP).""" + cl = _tcp_client() + _test_pipeline_negative_absolute_gauge(cl, 'tcp') + + def _test_big_numbers(cl, proto): num = 1234568901234 result = 'foo:1234568901234|%s' @@ -651,6 +870,12 @@ def test_big_numbers_udp(): _test_big_numbers(cl, 'udp') +def test_big_numbers_tcp(): + """Test big numbers with TCP client.""" + cl = _tcp_client() + _test_big_numbers(cl, 'tcp') + + def _test_rate_no_send(cl, proto): cl.incr('foo', rate=0.5) _sock_check(cl._sock, 0, proto) @@ -663,6 +888,13 @@ def test_rate_no_send_udp(): _test_rate_no_send(cl, 'udp') +@mock.patch.object(random, 'random', lambda: 2) +def test_rate_no_send_tcp(): + """Rate below random value prevents sending with TCPStatsClient.incr.""" + cl = _tcp_client() + _test_rate_no_send(cl, 'tcp') + + def test_socket_error(): """Socket error on StatsClient should be ignored.""" cl = _udp_client() @@ -682,3 +914,25 @@ def test_pipeline_packet_size(): eq_(2, sc._sock.sendto.call_count) assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512 assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512 + + +@mock.patch.object(socket, 'socket') +def test_tcp_raises_exception_to_user(mock_socket): + """Socket errors in TCPStatsClient should be raised to user.""" + addr = ('127.0.0.1', 1234) + cl = _tcp_client(addr=addr[0], port=addr[1]) + cl.incr('foo') + cl._sock.sendall.assert_called_once() + cl._sock.sendall.side_effect = socket.error + with assert_raises(socket.error): + cl.incr('foo') + + +@mock.patch.object(socket, 'socket') +def test_tcp_timeout(mock_socket): + """Timeout on TCPStatsClient should be set on socket.""" + test_timeout = 321 + cl = TCPStatsClient(timeout=test_timeout) + cl.incr('foo') + cl._sock.settimeout.assert_called_once() + cl._sock.settimeout.assert_called_with(test_timeout) -- cgit v1.2.1 From 9b007bd8b089bb03faec5605e841bad781920e91 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Tue, 17 Mar 2015 20:28:39 +0900 Subject: update docs to document TCPStatsClient --- docs/index.rst | 1 + docs/reference.rst | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ docs/tcp.rst | 19 ++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 docs/tcp.rst diff --git a/docs/index.rst b/docs/index.rst index b56bb64..ed4cb6c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -70,6 +70,7 @@ Contents types.rst timing.rst pipeline.rst + tcp.rst reference.rst contributing.rst diff --git a/docs/reference.rst b/docs/reference.rst index 0c0202e..e4527a7 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -275,6 +275,93 @@ stats. This method is not implemented on the base StatsClient class. +.. _TCPStatsClient: + +``TCPStatsClient`` +================== + +:: + + TCPStatsClient(host='localhost', port=8125, prefix=None, timeout=None) + +Create a new ``TCPStatsClient`` instance with the appropriate connection +and prefix information. + +* ``host``: the hostname or IPv4 address of the statsd_ server. + +* ``port``: the port of the statsd server. + +* ``prefix``: a prefix to distinguish and group stats from an + application or environment. + +* ``timeout``: socket timeout for any actions on the connection socket. + + +``TCPStatsClient`` implements all methods of ``StatsClient``, with the +difference that it is not thread safe and it can raise exceptions on +connection errors. On the contrary to ``StatsClient`` it uses a ``TCP`` +connection to connect to Statsd. +Additionally to the methods of ``StatsClient`` it has a few which are +specific to ``TCP`` connections. + + +.. _tcp_close: + +``close`` +--------- + +:: + + from statsd import TCPStatsClient + + statsd = TCPStatsClient() + statsd.incr('some.event') + statsd.close() + +Closes a connection that's currently open and deletes it's socket. If this is +called on a ``TCPStatsClient`` which currently has no open connection it is a +non-action. + + +.. _tcp_connect: + +``connect`` +----------- + +:: + + from statsd import TCPStatsClient + + statsd = TCPStatsClient() + statsd.incr('some.event') # calls connect() internally + statsd.close() + statsd.connect() # creates new connection + +Creates a connection to Statsd. If there are errors like connection timed out +or connection refused, the according exceptions will be raised. It is usually +not necessary to call this method because sending data to Statsd will call +``connect`` implicitely if the current instance of ``TCPStatsClient`` does not +already hold an open connection. + + +.. _tcp_reconnect: + +``reconnect`` +------------- + +:: + + from statsd import TCPStatsClient + + statsd = TCPStatsClient() + statsd.incr('some.event') + statsd.reconnect() # closes open connection and creates new one + +Closes a currently existing connection and replaces it with a new one. If no +connection exists already it will simply create a new one. Internally this +does nothing else than calling ``close()`` and ``connect()``. + + .. _statsd: https://github.com/etsy/statsd .. _0ed78be: https://github.com/etsy/statsd/commit/0ed78be7 .. _1c10cfc0ac: https://github.com/etsy/statsd/commit/1c10cfc0ac diff --git a/docs/tcp.rst b/docs/tcp.rst new file mode 100644 index 0000000..3c81af6 --- /dev/null +++ b/docs/tcp.rst @@ -0,0 +1,19 @@ +.. _tcp-chapter: + +============== +TCPStatsClient +============== + +The ``TCPStatsClient`` class has a very similar interface to ``StatsClient``, +but internally it uses ``TCP`` connections instead of ``UDP``. These are the +main differencies when using ``TCPStatsClient`` compared to ``StatsClient``: + +* The constructor supports a ``timeout`` parameter to set a timeout on all + socket actions. + +* The methods ``connect`` and all methods that send data can potentially raise + socket exceptions. + +* It is not thread-safe, so it is recommended to not share it across threads + unless a lot of attention is paid to make sure that no two threads ever use + it at once. -- cgit v1.2.1