summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorandy <andy@andymccurdy.com>2012-07-17 23:32:29 -0700
committerandy <andy@andymccurdy.com>2012-07-17 23:32:29 -0700
commit13996b75194b48021b1536fbd0a584cb8a550318 (patch)
tree69b9dbafee209b609a6e8ee4eece2c23ce679080
parent1f6c4a5ab1fefb125fb8af9b3974c3f25fd44f8f (diff)
downloadredis-py-13996b75194b48021b1536fbd0a584cb8a550318.tar.gz
Changed (p)subscribe and (p)unsubscribe to no longer return confirmation of subscirption. These messages should be consumed in listen(). Fixes #254, #255, #233, #232, and #176. whew.
-rw-r--r--CHANGES9
-rw-r--r--redis/client.py13
-rw-r--r--tests/pubsub.py64
3 files changed, 76 insertions, 10 deletions
diff --git a/CHANGES b/CHANGES
index 4f1a74e..37db5d4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,4 +1,13 @@
* 2.4.14 (in development)
+ * Changed (p)subscribe and (p)unsubscribe to no longer return messages
+ indicating the channel was subscribed/unsubscribed to. These messages
+ are available in the listen() loop instead. This is to prevent the
+ following scenario:
+ * Client A is subscribed to "foo"
+ * Client B publishes message to "foo"
+ * Client A subscribes to channel "bar" at the same time.
+ Prior to this change, the subscribe() call would return the published
+ messages on "foo" rather than the subscription confirmation to "bar".
* Added support for GETRANGE, thanks Jean-Philippe Caruana
* A new setting "decode_responses" specifies whether return values from
Redis commands get decoded automatically using the client's charset
diff --git a/redis/client.py b/redis/client.py
index d866928..a198c82 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -1239,11 +1239,20 @@ class PubSub(object):
def reset(self):
if self.connection:
+ self.connection.disconnect()
self.connection_pool.release(self.connection)
self.connection = None
+ def close(self):
+ self.reset()
+
def execute_command(self, *args, **kwargs):
"Execute a publish/subscribe command"
+
+ # NOTE: don't parse the response in this function. it could pull a
+ # legitmate message off the stack if the connection is already
+ # subscribed to one or more channels
+
if self.connection is None:
self.connection = self.connection_pool.get_connection(
'pubsub',
@@ -1252,7 +1261,6 @@ class PubSub(object):
connection = self.connection
try:
connection.send_command(*args)
- return self.parse_response()
except ConnectionError:
connection.disconnect()
# Connect manually here. If the Redis server is down, this will
@@ -1265,7 +1273,6 @@ class PubSub(object):
for pattern in self.patterns:
self.psubscribe(pattern)
connection.send_command(*args)
- return self.parse_response()
def parse_response(self):
"Parse the response from a publish/subscribe command"
@@ -1324,7 +1331,7 @@ class PubSub(object):
def listen(self):
"Listen for messages on channels this client has been subscribed to"
- while self.subscription_count:
+ while self.subscription_count or self.channels or self.patterns:
r = self.parse_response()
if r[0] == 'pmessage':
msg = {
diff --git a/tests/pubsub.py b/tests/pubsub.py
index 4ecf05c..a86ef5e 100644
--- a/tests/pubsub.py
+++ b/tests/pubsub.py
@@ -13,11 +13,24 @@ class PubSubTestCase(unittest.TestCase):
self.connection_pool.disconnect()
def test_channel_subscribe(self):
+ # subscribe doesn't return anything
self.assertEquals(
self.pubsub.subscribe('foo'),
- ['subscribe', 'foo', 1]
+ None
)
+ # send a message
self.assertEquals(self.client.publish('foo', 'hello foo'), 1)
+ # there should be now 2 messages in the buffer, a subscribe and the
+ # one we just published
+ self.assertEquals(
+ self.pubsub.listen().next(),
+ {
+ 'type': 'subscribe',
+ 'pattern': None,
+ 'channel': 'foo',
+ 'data': 1
+ }
+ )
self.assertEquals(
self.pubsub.listen().next(),
{
@@ -27,29 +40,66 @@ class PubSubTestCase(unittest.TestCase):
'data': 'hello foo'
}
)
+
+ # unsubscribe
self.assertEquals(
self.pubsub.unsubscribe('foo'),
- ['unsubscribe', 'foo', 0]
+ None
+ )
+ # unsubscribe message should be in the buffer
+ self.assertEquals(
+ self.pubsub.listen().next(),
+ {
+ 'type': 'unsubscribe',
+ 'pattern': None,
+ 'channel': 'foo',
+ 'data': 0
+ }
)
def test_pattern_subscribe(self):
+ # psubscribe doesn't return anything
self.assertEquals(
- self.pubsub.psubscribe('fo*'),
- ['psubscribe', 'fo*', 1]
+ self.pubsub.psubscribe('f*'),
+ None
)
+ # send a message
self.assertEquals(self.client.publish('foo', 'hello foo'), 1)
+ # there should be now 2 messages in the buffer, a subscribe and the
+ # one we just published
+ self.assertEquals(
+ self.pubsub.listen().next(),
+ {
+ 'type': 'psubscribe',
+ 'pattern': None,
+ 'channel': 'f*',
+ 'data': 1
+ }
+ )
self.assertEquals(
self.pubsub.listen().next(),
{
'type': 'pmessage',
- 'pattern': 'fo*',
+ 'pattern': 'f*',
'channel': 'foo',
'data': 'hello foo'
}
)
+
+ # unsubscribe
+ self.assertEquals(
+ self.pubsub.punsubscribe('f*'),
+ None
+ )
+ # unsubscribe message should be in the buffer
self.assertEquals(
- self.pubsub.punsubscribe('fo*'),
- ['punsubscribe', 'fo*', 0]
+ self.pubsub.listen().next(),
+ {
+ 'type': 'punsubscribe',
+ 'pattern': None,
+ 'channel': 'f*',
+ 'data': 0
+ }
)
class PubSubRedisDownTestCase(unittest.TestCase):