diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 22:32:14 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 22:32:14 -0700 |
commit | ad67d89d0fe34b59b798be68c787277f3ab781fc (patch) | |
tree | 2707d1a9e35cde820a93d5ef0a028a27d7170377 | |
parent | 5f7b956fb1d389e23cdfc4e1e1809fa6662ac368 (diff) | |
download | redis-py-ad67d89d0fe34b59b798be68c787277f3ab781fc.tar.gz |
refactor a bunch of the tests.
- split out tests for each client function
- alphabetize
- make sure response callbacks return system info dicts with
native string keys rather than byte strings.
- make sure empty versions of commands that typically return a list
return an empty list when streams or messages don't exist
-rwxr-xr-x | redis/client.py | 105 | ||||
-rw-r--r-- | tests/test_commands.py | 516 |
2 files changed, 427 insertions, 194 deletions
diff --git a/redis/client.py b/redis/client.py index 1e06f18..f8d13b7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -182,10 +182,15 @@ def parse_sentinel_get_master(response): return response and (response[0], int(response[1])) or None -def pairs_to_dict(response): +def pairs_to_dict(response, decode_keys=False): "Create a dict given a list of key/value pairs" - it = iter(response) - return dict(izip(it, it)) + if decode_keys: + # the iter form is faster, but I don't know how to make that work + # with a nativestr() map + return dict(izip(imap(nativestr, response[::2]), response[1::2])) + else: + it = iter(response) + return dict(izip(it, it)) def pairs_to_dict_typed(response, type_info): @@ -238,26 +243,12 @@ def parse_stream_list(response): return [(r[0], pairs_to_dict(r[1])) for r in response] -def parse_recursive_dict(response): - if response is None: - return None - result = {} - while response: - k = response.pop(0) - v = response.pop(0) - if isinstance(v, list): - v = parse_recursive_dict(v) - result[k] = v - return result +def pairs_to_dict_with_nativestr_keys(response): + return pairs_to_dict(response, decode_keys=True) -def parse_list_of_recursive_dicts(response): - if response is None: - return None - result = [] - for group in response: - result.append(parse_recursive_dict(group)) - return result +def parse_list_of_dicts(response): + return list(imap(pairs_to_dict_with_nativestr_keys, response)) def parse_xclaim(response): @@ -266,6 +257,15 @@ def parse_xclaim(response): return parse_stream_list(response) +def parse_xinfo_stream(response): + data = pairs_to_dict(response, decode_keys=True) + first = data['first-entry'] + data['first-entry'] = (first[0], pairs_to_dict(first[1])) + last = data['last-entry'] + data['last-entry'] = (last[0], pairs_to_dict(last[1])) + return data + + def parse_xread(response): if response is None: return [] @@ -273,33 +273,20 @@ def parse_xread(response): def parse_xpending(response, **options): - if isinstance(response, list): - if options.get('parse_detail', False): - return parse_range_xpending(response) - consumers = [] - for consumer_name, consumer_pending in response[3]: - consumers.append({ - 'name': consumer_name, - 'pending': consumer_pending - }) - return { - 'pending': response[0], - 'lower': response[1], - 'upper': response[2], - 'consumers': consumers - } + if options.get('parse_detail', False): + return parse_xpending_range(response) + consumers = [{'name': n, 'pending': long(p)} for n, p in response[3] or []] + return { + 'pending': response[0], + 'min': response[1], + 'max': response[2], + 'consumers': consumers + } -def parse_range_xpending(response): - result = [] - for message in response: - result.append({ - 'message_id': message[0], - 'consumer': message[1], - 'time_since_delivered': message[2], - 'times_delivered': message[3] - }) - return result +def parse_xpending_range(response): + k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered') + return [dict(izip(k, r)) for r in response] def float_or_none(response): @@ -529,11 +516,11 @@ class StrictRedis(object): 'XCLAIM': parse_xclaim, 'XGROUP CREATE': bool_ok, 'XGROUP DELCONSUMER': int, - 'XGROUP DESTROY': int, + 'XGROUP DESTROY': bool, 'XGROUP SETID': bool_ok, - 'XINFO CONSUMERS': parse_list_of_recursive_dicts, - 'XINFO GROUPS': parse_list_of_recursive_dicts, - 'XINFO STREAM': parse_recursive_dict, + 'XINFO CONSUMERS': parse_list_of_dicts, + 'XINFO GROUPS': parse_list_of_dicts, + 'XINFO STREAM': parse_xinfo_stream, 'XPENDING': parse_xpending, 'ZSCAN': parse_zscan, } @@ -1757,6 +1744,15 @@ class StrictRedis(object): return self.execute_command('SUNIONSTORE', dest, *args) # STREAMS COMMANDS + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + def xadd(self, name, fields, id='*', maxlen=None, approximate=True): """ Add to a stream. @@ -1782,15 +1778,6 @@ class StrictRedis(object): pieces.extend(pair) return self.execute_command('XADD', name, *pieces) - def xack(self, name, groupname, *ids): - """ - Acknowledges the successful processing of one or more messages. - name: name of the stream. - groupname: name of the consumer group. - *ids: message ids to acknowlege. - """ - return self.execute_command('XACK', name, groupname, *ids) - def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, idle=None, time=None, retrycount=None, force=False, justid=False): diff --git a/tests/test_commands.py b/tests/test_commands.py index 824fc44..4e11b55 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2,6 +2,7 @@ from __future__ import with_statement import binascii import datetime import pytest +import re import redis import time @@ -34,6 +35,13 @@ def redis_server_time(client): return datetime.datetime.fromtimestamp(timestamp) +def get_stream_message(client, stream, message_id): + "Fetch a stream message and format it as a (message_id, fields) pair" + response = client.xrange(stream, start=message_id, finish=message_id) + assert len(response) == 1 + return response[0] + + # RESPONSE CALLBACKS class TestResponseCallbacks(object): "Tests for the response callback system" @@ -1601,183 +1609,421 @@ class TestRedisCommands(object): (2.1909382939338684, 41.433790281840835)]] @skip_if_server_version_lt('5.0.0') - def test_xack(self, sr): - stream_name = 'xack_test_stream' - sr.delete(stream_name) - group_name = 'xack_test_group' + def test_xack(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + # xack on a stream that doesn't exist + assert r.xack(stream, group, '0-0') == 0 + + m1 = r.xadd(stream, {'one': 'one'}) + m2 = r.xadd(stream, {'two': 'two'}) + m3 = r.xadd(stream, {'three': 'three'}) + + # xack on a group that doesn't exist + assert r.xack(stream, group, m1) == 0 + + r.xgroup_create(stream, group, 0) + r.xreadgroup(group, consumer, streams={stream: 0}) + # xack returns the number of ack'd elements + assert r.xack(stream, group, m1) == 1 + assert r.xack(stream, group, m2, m3) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xadd(self, r): + stream = 'stream' + message_id = r.xadd(stream, {'foo': 'bar'}) + assert re.match(rb'[0-9]+\-[0-9]+', message_id) - assert sr.xack(stream_name, group_name, 0) == 0 - assert sr.xack(stream_name, group_name, '1-1') == 0 - assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + # explicit message id + message_id = b('9999999999999999999-0') + assert message_id == r.xadd(stream, {'foo': 'bar'}, id=message_id) + + # with maxlen, the list evicts the first message + r.xadd(stream, {'foo': 'bar'}, maxlen=2, approximate=False) + assert r.xlen(stream) == 2 @skip_if_server_version_lt('5.0.0') - def test_xclaim(self, sr): - stream_name = 'xclaim_test_stream' - group_name = 'xclaim_test_consumer_group' - sr.delete(stream_name) - - stamp = sr.xadd(stream_name, {"john": "wick"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, 'action_movie_consumer', - streams={stream_name: 0}) - assert sr.xinfo_consumers(stream_name, group_name)[0][ - b('name')] == b('action_movie_consumer') - assert sr.xclaim(stream_name, group_name, 'reeves_fan', - min_idle_time=0, message_ids=(stamp,))[0][0] == stamp - assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', - min_idle_time=0, message_ids=(stamp,), - justid=True) == [b(stamp), ] + def test_xclaim(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + + message_id = r.xadd(stream, {'john': 'wick'}) + message = get_stream_message(r, stream, message_id) + r.xgroup_create(stream, group, 0) + + # trying to claim a message that isn't already pending doesn't + # do anything + response = r.xclaim(stream, group, consumer2, + min_idle_time=0, message_ids=(message_id,)) + assert response == [] + + # read the group as consumer1 to initially claim the messages + r.xreadgroup(group, consumer1, streams={stream: 0}) + + # claim the message as consumer2 + response = r.xclaim(stream, group, consumer2, + min_idle_time=0, message_ids=(message_id,)) + assert response[0] == message + + # reclaim the message as consumer1, but use the justid argument + # which only returns message ids + assert r.xclaim(stream, group, consumer1, + min_idle_time=0, message_ids=(message_id,), + justid=True) == [message_id] @skip_if_server_version_lt('5.0.0') - def test_xdel(self, sr): - stream_name = 'xdel_test_stream' - sr.delete(stream_name) + def test_xdel(self, r): + stream = 'stream' - assert sr.xdel(stream_name, 1) == 0 + # deleting from an empty stream doesn't do anything + assert r.xdel(stream, 1) == 0 - sr.xadd(stream_name, {"foo": "bar"}, id=1) - assert sr.xdel(stream_name, 1) == 1 + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) - stamp = sr.xadd(stream_name, {"baz": "qaz"}) - assert sr.xdel(stream_name, 1, stamp) == 1 - assert sr.xdel(stream_name, 1, stamp, 42) == 0 + # xdel returns the number of deleted elements + assert r.xdel(stream, m1) == 1 + assert r.xdel(stream, m2, m3) == 2 @skip_if_server_version_lt('5.0.0') - def test_xgroup(self, sr): - stream_name = 'xgroup_test_stream' - sr.delete(stream_name) - group_name = 'xgroup_test_group' - message = {'name': 'boaty', 'other': 'mcboatface'} - b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} + def test_xgroup_create(self, r): + # tests xgroup_create and xinfo_groups + stream = 'stream' + group = 'group' + r.xadd(stream, {'foo': 'bar'}) + + # no group is setup yet, no info to obtain + assert r.xinfo_groups(stream) == [] + + assert r.xgroup_create(stream, group, 0) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': b('0-0') + }] + assert r.xinfo_groups(stream) == expected - stamp1 = sr.xadd(stream_name, message) - assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] + @skip_if_server_version_lt('5.0.0') + def test_xgroup_delconsumer(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) - assert sr.xinfo_groups(name=stream_name) == [] - assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') - assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) + # a consumer that hasn't yet read any messages doesn't do anything + assert r.xgroup_delconsumer(stream, group, consumer) == 0 - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b(stamp1) - assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b('0-0') + # read all messages from the group + r.xreadgroup(group, consumer, streams={stream: 0}) - consumer_name = 'captain_jack_sparrow' + # deleting the consumer should return 2 pending messages + assert r.xgroup_delconsumer(stream, group, consumer) == 2 - expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] - assert sr.xreadgroup(groupname=group_name, - consumername=consumer_name, - streams={stream_name: '0'}) == expected_value + @skip_if_server_version_lt('5.0.0') + def test_xgroup_destroy(self, r): + stream = 'stream' + group = 'group' + r.xadd(stream, {'foo': 'bar'}) - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 - sr.xgroup_delconsumer(stream_name, group_name, consumer_name) - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 + # destroying a nonexistent group returns False + assert not r.xgroup_destroy(stream, group) - assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 + r.xgroup_create(stream, group, 0) + assert r.xgroup_destroy(stream, group) @skip_if_server_version_lt('5.0.0') - def test_xpending(self, sr): - stream_name = 'xpending_test_stream' - group_name = 'xpending_test_consumer_group' - consumer_name = 'marie' - sr.delete(stream_name) - - sr.xadd(stream_name, {"foo": "bar"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, consumer_name, - streams={stream_name: 0}) - response = sr.xpending(stream_name, group_name) - assert sorted(response.keys()) == ['consumers', 'lower', 'pending', - 'upper'] - - response = sr.xpending_range(stream_name, group_name, - consumername=consumer_name) - assert sorted(response[0].keys()) == ['consumer', 'message_id', - 'time_since_delivered', - 'times_delivered'] + def test_xgroup_setid(self, r): + stream = 'stream' + group = 'group' + message_id = r.xadd(stream, {'foo': 'bar'}) + + r.xgroup_create(stream, group, 0) + # advance the last_delivered_id to the message_id + r.xgroup_setid(stream, group, message_id) + expected = [{ + 'name': b(group), + 'consumers': 0, + 'pending': 0, + 'last-delivered-id': message_id + }] + assert r.xinfo_groups(stream) == expected @skip_if_server_version_lt('5.0.0') - def test_xrange(self, sr): - varname = 'xrange_test' - sr.delete(varname) - assert sr.xlen(varname) == 0 - stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) - assert sr.xlen(varname) == 1 - stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) - assert sr.xlen(varname) == 2 - assert stamp1 != stamp2 - - milli, offset = stamp2.decode('utf-8').split('-') - new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8') - stamp3 = sr.xadd(varname, {"foo": "bar"}, id=new_id) - assert sr.xlen(varname) == 3 - assert stamp3 == new_id - stamp4 = sr.xadd(varname, {"foo": "baz"}) - assert sr.xlen(varname) == 4 + def test_xinfo_consumers(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + r.xadd(stream, {'foo': 'bar'}) + + r.xgroup_create(stream, group, 0) + r.xreadgroup(group, consumer1, streams={stream: 0}) + r.xreadgroup(group, consumer2, streams={stream: 0}) + info = r.xinfo_consumers(stream, group) + assert len(info) == 2 + expected = [ + {'name': b(consumer1), 'pending': 1}, + {'name': b(consumer2), 'pending': 0}, + ] + + # we can't determine the idle time, so just make sure it's an int + assert isinstance(info[0].pop('idle'), int) + assert isinstance(info[1].pop('idle'), int) + assert info == expected + + @skip_if_server_version_lt('5.0.0') + def test_xinfo_stream(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + info = r.xinfo_stream(stream) + + assert info['length'] == 2 + assert info['first-entry'] == get_stream_message(r, stream, m1) + assert info['last-entry'] == get_stream_message(r, stream, m2) + + @skip_if_server_version_lt('5.0.0') + def test_xlen(self, r): + stream = 'stream' + assert r.xlen(stream) == 0 + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + assert r.xlen(stream) == 2 + + @skip_if_server_version_lt('5.0.0') + def test_xpending(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # xpending on a group that has no consumers yet + expected = { + 'pending': 0, + 'min': None, + 'max': None, + 'consumers': [] + } + assert r.xpending(stream, group) == expected + + # read 1 message from the group with each consumer + r.xreadgroup(group, consumer1, streams={stream: 0}, count=1) + r.xreadgroup(group, consumer2, streams={stream: m1}, count=1) + + expected = { + 'pending': 2, + 'min': m1, + 'max': m2, + 'consumers': [ + {'name': b(consumer1), 'pending': 1}, + {'name': b(consumer2), 'pending': 1}, + ] + } + assert r.xpending(stream, group) == expected + + @skip_if_server_version_lt('5.0.0') + def test_xpending_range(self, r): + stream = 'stream' + group = 'group' + consumer1 = 'consumer1' + consumer2 = 'consumer2' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + + # xpending range on a group that has no consumers yet + assert r.xpending_range(stream, group) == [] + + # read 1 message from the group with each consumer + r.xreadgroup(group, consumer1, streams={stream: 0}, count=1) + r.xreadgroup(group, consumer2, streams={stream: m1}, count=1) + + response = r.xpending_range(stream, group) + assert len(response) == 2 + assert response[0]['message_id'] == m1 + assert response[0]['consumer'] == b(consumer1) + assert response[1]['message_id'] == m2 + assert response[1]['consumer'] == b(consumer2) + + @skip_if_server_version_lt('5.0.0') + def test_xrange(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + m4 = r.xadd(stream, {'foo': 'bar'}) def get_ids(results): return [result[0] for result in results] - results = sr.xrange(varname, start=stamp1) - assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4] + results = r.xrange(stream, start=m1) + assert get_ids(results) == [m1, m2, m3, m4] - results = sr.xrange(varname, start=stamp2, finish=stamp3) - assert get_ids(results) == [stamp2, stamp3] + results = r.xrange(stream, start=m2, finish=m3) + assert get_ids(results) == [m2, m3] - results = sr.xrange(varname, finish=stamp3) - assert get_ids(results) == [stamp1, stamp2, stamp3] + results = r.xrange(stream, finish=m3) + assert get_ids(results) == [m1, m2, m3] - results = sr.xrange(varname, finish=stamp2, count=1) - assert get_ids(results) == [stamp1] + results = r.xrange(stream, finish=m2, count=1) + assert get_ids(results) == [m1] - results = sr.xrevrange(varname, start=stamp4) - assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1] + @skip_if_server_version_lt('5.0.0') + def test_xread(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'bing': 'baz'}) - results = sr.xrevrange(varname, start=stamp3, finish=stamp2) - assert get_ids(results) == [stamp3, stamp2] + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at 0 returns both messages + assert r.xread(streams={stream: 0}) == expected - results = sr.xrevrange(varname, finish=stamp3) - assert get_ids(results) == [stamp4, stamp3] + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + ] + ] + ] + # xread starting at 0 and count=1 returns only the first message + assert r.xread(streams={stream: 0}, count=1) == expected + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at m1 returns only the second message + assert r.xread(streams={stream: m1}) == expected + + # xread starting at the last message returns an empty list + assert r.xread(streams={stream: m2}) == [] + + @skip_if_server_version_lt('5.0.0') + def test_xreadgroup(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'bing': 'baz'}) + r.xgroup_create(stream, group, 0) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at 0 returns both messages + assert r.xreadgroup(group, consumer, streams={stream: 0}) == expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m1), + ] + ] + ] + # xread starting at 0 and count=1 returns only the first message + assert r.xreadgroup(group, consumer, streams={stream: 0}, count=1) == \ + expected + + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) - results = sr.xrevrange(varname, finish=stamp2, count=1) - assert get_ids(results) == [stamp4] + expected = [ + [ + stream, + [ + get_stream_message(r, stream, m2), + ] + ] + ] + # xread starting at m1 returns only the second message + assert r.xreadgroup(group, consumer, streams={stream: m1}) == expected - assert sr.xlen(varname) == 4 + r.xgroup_destroy(stream, group) + r.xgroup_create(stream, group, 0) + + # xread starting at the last message returns an empty message list + expected = [ + [ + stream, + [] + ] + ] + assert r.xreadgroup(group, consumer, streams={stream: m2}) == expected @skip_if_server_version_lt('5.0.0') - def test_xread(self, sr): - varname = 'xread_test' - sr.delete(varname) - stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) - stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"}) - assert stamp1 != stamp2 + def test_xrevrange(self, r): + stream = 'stream' + m1 = r.xadd(stream, {'foo': 'bar'}) + m2 = r.xadd(stream, {'foo': 'bar'}) + m3 = r.xadd(stream, {'foo': 'bar'}) + m4 = r.xadd(stream, {'foo': 'bar'}) - results = sr.xread(streams={varname: '$'}, count=10, block=10) - assert results == [] + def get_ids(results): + return [result[0] for result in results] + + results = r.xrevrange(stream, start=m4) + assert get_ids(results) == [m4, m3, m2, m1] - results = sr.xread(count=3, block=0, streams={varname: stamp1}) - assert results[0][1][0][0] == stamp2 + results = r.xrevrange(stream, start=m3, finish=m2) + assert get_ids(results) == [m3, m2] + + results = r.xrevrange(stream, finish=m3) + assert get_ids(results) == [m4, m3] + + results = r.xrevrange(stream, finish=m2, count=1) + assert get_ids(results) == [m4] @skip_if_server_version_lt('5.0.0') - def test_xtrim(self, sr): - stream_name = 'xtrim_test_stream' - sr.delete(stream_name) + def test_xtrim(self, r): + stream = 'stream' + + # trimming an empty key doesn't do anything + assert r.xtrim(stream, 1000) == 0 - assert sr.xtrim(stream_name, 1000) == 0 + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) - for i in range(300): - sr.xadd(stream_name, {"index": i}) + # trimming an amount large than the number of messages + # doesn't do anything + assert r.xtrim(stream, 5, approximate=False) == 0 - assert sr.xtrim(stream_name, 1000, approximate=False) == 0 - assert sr.xtrim(stream_name, 300) == 0 - assert sr.xtrim(stream_name, 299) == 0 - assert sr.xtrim(stream_name, 234) == 0 - assert sr.xtrim(stream_name, 234, approximate=False) == 66 + # 1 message is trimmed + assert r.xtrim(stream, 3, approximate=False) == 1 class TestStrictCommands(object): |